Skip to content

Commit

Permalink
feat: add WINDDATABSE daily quote
Browse files Browse the repository at this point in the history
  • Loading branch information
yuangn committed Nov 14, 2024
1 parent 8d29185 commit 9964203
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 16 deletions.
55 changes: 44 additions & 11 deletions backtest/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from xloil.pandas import PDFrame

import sqlite3
SQLITE_FILE_PAHT = r'D:\onedrive\文档\etc\ifind.db'
SQLITE_FILE_PATH = r'D:\onedrive\文档\etc\ifind.db'


def print_status(*args):
with xlo.StatusBar(2000) as status:
Expand All @@ -27,7 +28,7 @@ def MsgBox(content: str = "", title: str = "知心提示") -> int:


def execute_sql(sql_stat):
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
cursor = conn.cursor()
cursor.execute(sql_stat)
conn.commit()
Expand Down Expand Up @@ -86,6 +87,29 @@ def create_table():
FOREIGN KEY (desc_id) REFERENCES edb_desc (id)
);
''')
execute_sql('''
CREATE TABLE IF NOT EXISTS wind_desc (
id INTEGER PRIMARY KEY AUTOINCREMENT,
indicator_name TEXT NOT NULL,
explanation TEXT,
script_type TEXT,
sql_stat TEXT,
remark TEXT,
last_updated_date DATE,
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
execute_sql('''
CREATE TABLE IF NOT EXISTS wind_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
indicator_id INTEGER NOT NULL,
indicator_date DATE,
value REAL NOT NULL,
disclosure_date DATE,
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (indicator_id) REFERENCES wind_desc (id)
);
''')


@xlo.func(command=True)
Expand All @@ -108,7 +132,7 @@ def fetch_one():
if indicator_row == -1:
MsgBox('I列没有加入指标,请加指标')
return
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
# 1 取得所有指标及其公式
indicator_df = pd.read_sql(f'''
select a.id, b.date, a.name, a.formula from indicator_description a
Expand Down Expand Up @@ -160,7 +184,7 @@ def save_one():
data_df = pd.DataFrame(data, columns=['indicator_date','value', 'indicator_id'])
data_df = data_df[data_df['value']!=0] # TODO 把值为0的去掉,或许这个对某些指标来说不严谨。
data_df['indicator_date'] = data_df['indicator_date'].apply(lambda x: xlo.from_excel_date(x).strftime('%Y%m%d'))
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
# 删DB数据
execute_sql(f'delete from indicator_data where indicator_id={data_df["indicator_id"].iloc[0]}')
# 插入数据
Expand All @@ -180,7 +204,7 @@ def fetch_increment():
'''
ws = xlo.active_worksheet()
ws.range(0, 0, 5000, 4).clear()
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)

# 1 取得所有指标及其公式
indicator_df = pd.read_sql(f'''
Expand Down Expand Up @@ -231,7 +255,7 @@ def save_increment():
data_df = pd.DataFrame(data, columns=['indicator_date','value', 'indicator_id', 'pre_value','name'])
data_df = data_df[(data_df['value']!=data_df['pre_value']) & (data_df['value']!=0.0)].reset_index()
data_df['indicator_date'] = data_df['indicator_date'].apply(lambda x: xlo.from_excel_date(x).strftime('%Y%m%d'))
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
# 插入数据
data_df[['indicator_date','value','indicator_id']].to_sql('indicator_data', conn, if_exists='append', index=False)
# 更新数据最新日期
Expand Down Expand Up @@ -262,7 +286,7 @@ def fetch_one_edb():
if indicator_row == -1:
MsgBox('I列没有加入指标,请加指标')
return
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
# 1 取得所有指标及其公式
indicator_df = pd.read_sql(f'''
select id, indicator_name, indicator_id, last_updated_date from edb_desc
Expand Down Expand Up @@ -299,7 +323,7 @@ def save_one_edb():
if indicator_row == -1:
print_status('I列没有加入指标,请加指标')
return
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
indicator_df = pd.read_sql(f'''
select id, indicator_name, indicator_id, last_updated_date from edb_desc
where indicator_name in ( '{indicator_name}' )
Expand Down Expand Up @@ -341,7 +365,7 @@ def fetch_daily_edb():
'''
ws = xlo.active_worksheet()
ws.range(0, 0, 5000, 500).clear()
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)

# 1 取得日频数据中最小日期的下一天
min_date = pd.read_sql('''
Expand Down Expand Up @@ -384,7 +408,7 @@ def save_daily_edb():
melted_df = data_df.melt(id_vars='indicator_date', var_name='indicator_id', value_name='indicator_value')

# 2 得到具体id
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
desc_df = pd.read_sql('select id, indicator_id from edb_desc', conn)
melted_df = melted_df.merge(desc_df, on='indicator_id', how='left')
# 3 删除原有的
Expand All @@ -405,7 +429,7 @@ def save_daily_edb():


def get_data(indicators: List, start_date: str):
conn = sqlite3.connect(SQLITE_FILE_PAHT)
conn = sqlite3.connect(SQLITE_FILE_PATH)
indicators_str = ",".join([f"'{i}'" for i in indicators if i])
sql_stat = f'''
select b.name, a.value, a.indicator_date "日期"
Expand All @@ -421,6 +445,15 @@ def get_data(indicators: List, start_date: str):
join edb_desc b on a.desc_id =b.id
and a.indicator_date >'{start_date}'
and b.indicator_name in ( {indicators_str} )
union all
select b.indicator_name, a.value, a.indicator_date
from wind_data a
join wind_desc b on a.indicator_id=b.id
and a.indicator_date >'{start_date}'
and b.indicator_name in ( {indicators_str} )
'''
df = pd.read_sql(sql_stat, conn)
pivot_df = df.pivot(index='日期', columns='name', values='value')
Expand Down
4 changes: 2 additions & 2 deletions backtest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# pip install -i https://mirrors.aliyun.com/pypi/simple/ -r requirements.txt
sqlalchemy
pandas
numpy
matplotlib
Expand All @@ -8,6 +7,7 @@ xloil
pywin32==306
plotly
scipy
sqlalchemy
bottleneck
SQLAlchemy==2.0.20
cx_Oracle==8.3.0

116 changes: 113 additions & 3 deletions backtest/settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,42 @@
import platform, os
from typing import List
import sqlalchemy as sa
import pandas as pd
SQLITE_FILE_PAHT = r'D:\onedrive\文档\etc\ifind.db'
import datetime as dt
import numpy as np
import warnings
warnings.filterwarnings("ignore")
import sqlalchemy as sa
import sqlite3


# 本地文件数据库配置
SQLITE_FILE_PATH = r'D:\onedrive\文档\etc\ifind.db'
ENGINE = sa.create_engine(
f'sqlite:///{SQLITE_FILE_PAHT}', echo=False)
f'sqlite:///{SQLITE_FILE_PATH}', echo=False)

# Oracle配置
if platform.system() == 'Linux':
os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.ZHS16GBK'
os.environ['ORACLE_HOME']='/opt/oracle/instantclient'
os.environ['TNS_ADMIN'] = '$ORACLE_HOME/network/admin'
os.environ['LD_LIBRARY_PATH'] = '/opt/oracle/instantclient:$ORACLE_HOME'
else:
# 记得设置PATH环境变量,增加对TNS_ADMIN的引用
# 记得安装oracle instant client,以及vc++ 2010 redistributable
os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.ZHS16GBK'
os.environ['ORACLE_HOME'] = r'D:\oracle\instantclient_21_10'
os.environ['TNS_ADMIN'] = r'D:\oracle\instantclient_21_10\network\admin'
import cx_Oracle
cx_Oracle.init_oracle_client(lib_dir=r"D:\oracle\instantclient_21_10")
ENGINE_WIND = sa.create_engine('oracle+cx_oracle://u:p@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = 10.18.8.59)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = jrgc)))')

##################################################

def execute_sql(sql_stat):
conn = sqlite3.connect(SQLITE_FILE_PATH)
cursor = conn.cursor()
cursor.execute(sql_stat)
conn.commit()

def get_wind_data(indicator_list:List):
indicator_str = ",".join([f"'{x}'" for x in indicator_list])
Expand All @@ -18,3 +50,81 @@ def get_wind_data(indicator_list:List):
df = pd.pivot_table(df, index='v_date', columns='indicatorname', values='n_value')
df.index = pd.to_datetime(df.index)
return df

def _rsi(close_prices):
"""
计算RSI指标的函数,接收一个包含收盘价的Series对象(对应每个窗口内的收盘价序列)
"""
if len(close_prices) < 2:
return None # 如果传入的收盘价序列长度小于2,无法计算差分,返回None
delta = close_prices.diff() # 计算收盘价的一阶差分,得到价格变化量
up = delta.clip(lower=0) # 提取上涨的价格变化量(将负数置为0)
down = -delta.clip(upper=0) # 提取下跌的价格变化量(将正数置为0后取相反数)
sum_up = up.sum() # 计算上涨价格变化量的总和
sum_down = down.sum() # 计算下跌价格变化量的总和
if sum_down == 0:
return 100.0 if sum_up > 0 else 0 # 如果下跌总和为0,避免除0错误,根据上涨情况返回对应特殊值
rs = sum_up / sum_down # 计算相对强度RS
rsi = float(100 - (100 / (1 + rs))) # 根据RSI计算公式计算RSI值
return pd.Series({'rsi': rsi})


def load_wind():
# 0 取得本地最小日期, 保险起见取前400自然日的数据
min_date = pd.read_sql(' select min(last_updated_date) from wind_desc ', ENGINE).iloc[0,0]
min_date = '20050101' if np.isnan(min_date) or min_date is None else min_date
min_date = (dt.datetime.strptime(f"{min_date}", '%Y%m%d') - dt.timedelta(days=400)).strftime('%Y%m%d')

# 1 从wind数据取得基础数据
sql_stat = f'''
SELECT s_info_windcode, trade_dt, s_dq_adjclose, s_dq_amount
FROM WINDNEW.AShareEODPrices
where 1=1
and trade_dt > '{min_date}'
order by s_info_windcode, trade_dt
'''
df = pd.read_sql(sql_stat, con=ENGINE_WIND)
df['ma20'] = df.groupby('s_info_windcode')['s_dq_adjclose'].rolling(window=20).mean().reset_index(level=0, drop=True)
df['ma60'] = df.groupby('s_info_windcode')['s_dq_adjclose'].rolling(window=60).mean().reset_index(level=0, drop=True)
df['ma100'] = df.groupby('s_info_windcode')['s_dq_adjclose'].rolling(window=100).mean().reset_index(level=0, drop=True)
df['ma240'] = df.groupby('s_info_windcode')['s_dq_adjclose'].rolling(window=240).mean().reset_index(level=0, drop=True)
df = df.dropna()
def _calculate(group):
total_count = len(group)
# 月强势股占比:当前股价高于MA20的股票的占比
count_greater_1 = (group['s_dq_adjclose'] > group['ma20']).sum()
ratio = count_greater_1 / total_count if total_count > 0 else 0

# 季强势股占比
count_greater_2 = (group['s_dq_adjclose'] > group['ma60']).sum()
ratio2 = count_greater_2 / total_count if total_count > 0 else 0

# 半年强势股占比
count_greater_3 = (group['s_dq_adjclose'] > group['ma100']).sum()
ratio3 = count_greater_3 / total_count if total_count > 0 else 0

# 年强势股占比
count_greater_4 = (group['s_dq_adjclose'] > group['ma240']).sum()
ratio4 = count_greater_4 / total_count if total_count > 0 else 0

return pd.Series({'月强势股占比': ratio, '季强势股占比': ratio2, '半年强势股占比': ratio3, '年强势股占比': ratio4})
# 2 计算衍生指标
result_df = df.groupby('trade_dt').apply(_calculate).reset_index()
melted_df = result_df.melt(id_vars='trade_dt', var_name='indicator_name', value_name='value')
desc_df = pd.read_sql('select id indicator_id, indicator_name from wind_desc', con=ENGINE)
data_df = melted_df.merge(desc_df, on='indicator_name', how='left')
data_df['indicator_date'] = melted_df['trade_dt']

# 3 删除重复数据
execute_sql(f''' delete from wind_data where indicator_date>='{np.min(data_df["indicator_date"])}' ''')

# 4 插入数据库
data_df[['indicator_id', 'indicator_date', 'value']].to_sql('wind_data', ENGINE, if_exists='append', index=False)

# 5 更新数据最新日期
for indicator_id in list(data_df['indicator_id'].unique()):
sub_df = data_df[data_df['indicator_id']==indicator_id]
execute_sql(f'''update wind_desc set last_updated_date='{np.max(sub_df["indicator_date"])}' where id={sub_df["indicator_id"].iloc[-1]}''')

# load_wind()

0 comments on commit 9964203

Please sign in to comment.