Skip to content

Commit

Permalink
feat: add edb
Browse files Browse the repository at this point in the history
  • Loading branch information
yuangn committed Nov 7, 2024
1 parent 8bd06be commit cd6c3ee
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 7 deletions.
136 changes: 129 additions & 7 deletions backtest/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,36 @@ def create_table():
FOREIGN KEY (indicator_id) REFERENCES indicator_description (id)
);
''')
execute_sql('''
CREATE TABLE IF NOT EXISTS edb_desc (
id INTEGER PRIMARY KEY AUTOINCREMENT,
indicator_name TEXT NOT NULL,
indicator_id INTEGER NOT NULL,
frequency TEXT,
unit TEXT,
source TEXT,
remark TEXT,
last_updated_date DATE,
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
execute_sql('''
CREATE TABLE IF NOT EXISTS edb_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
desc_id INTEGER NOT NULL,
indicator_date DATE,
value REAL NOT NULL,
disclosure_date DATE,
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (desc_id) REFERENCES edb_desc (id)
);
''')


@xlo.func(command=True)
def fetch_one():
'''
全量抓取:也就是根据指标一个个从头开始抓
从第I列中的指标名字,全量抓取该指标历史所有数据
'''
ws = xlo.active_worksheet()
ws.range(0, 0, 5000, 2).clear()
Expand Down Expand Up @@ -105,7 +130,7 @@ def fetch_one():
@xlo.func(command=True)
def save_one():
'''
全量抓取:也就是根据指标一个个从头开始抓
配合fetch_one()使用,抓到后,全量更新至数据库
'''
ws = xlo.active_worksheet()
names = ws.range(0, 8, 50, 8).value
Expand All @@ -131,6 +156,7 @@ def save_one():
return
# 组织数据并插入DB
data_df = pd.DataFrame(data, columns=['indicator_date','value', 'indicator_id'])
data_df = data_df[data_df['value']!=0] # TODO 把值为0的去掉,或许这个对某些指标来说不严谨。
data_df['indicator_date'] = data_df['indicator_date'].apply(lambda x: xlo.from_excel_date(x).strftime('%Y%m%d'))
conn = sqlite3.connect(SQLITE_FILE_PAHT)
# 删DB数据
Expand All @@ -148,15 +174,12 @@ def save_one():
@xlo.func(command=True)
def fetch_increment():
'''
全量抓取:也就是根据指标一个个从头开始抓
增加抓取所有指标
'''
ws = xlo.active_worksheet()
ws.range(0, 0, 5000, 4).clear()
conn = sqlite3.connect(SQLITE_FILE_PAHT)
# 取得上一交易日
date_df = pd.read_sql('''select date from tdate where is_trade_date=1 and date>'20241028' ''', conn)
date_df['prev_date'] = date_df['date'].shift(1)
date_df.dropna(inplace=True)

# 1 取得所有指标及其公式
indicator_df = pd.read_sql(f'''
select a.id, b.date, a.name, a.formula from indicator_description a
Expand All @@ -168,6 +191,11 @@ def fetch_increment():
if len(indicator_df)==0:
print_status(f'没有待处理任务')
return

# 取得上一交易日
date_df = pd.read_sql('''select date from tdate where is_trade_date=1 and date>'20241020' ''', conn)
date_df['prev_date'] = date_df['date'].shift(1)
date_df.dropna(inplace=True)
indicator_df = indicator_df.merge(date_df, on='date', how='inner')
indicator_df['tdate'] = indicator_df['date'].apply(lambda x: f"{(str(x))[0:4]}/{(str(x))[4:6]}/{(str(x))[6:8]}")
indicator_df['tformula'] = indicator_df.apply(lambda row: row['formula'].replace('date()', row['tdate']), axis=1)
Expand Down Expand Up @@ -212,4 +240,98 @@ def save_increment():
ws.range(0, 0, 5000, 4).clear()



@xlo.func(command=True)
def fetch_one_edb():
'''
从第I列中的指标名字,全量抓取该指标历史所有数据
'''
ws = xlo.active_worksheet()
ws.range(0, 0, 5000, 2).clear()
names = ws.range(0, 8, 50, 8).value
indicator_name = '' # 在I列的指标名
indicator_row = -1 # I列的行index
for i, name in enumerate(names):
name = name[0]
if name is None:
continue
indicator_name = name
indicator_row = i
break
if indicator_row == -1:
MsgBox('I列没有加入指标,请加指标')
return
conn = sqlite3.connect(SQLITE_FILE_PAHT)
# 1 取得所有指标及其公式
indicator_df = pd.read_sql(f'''
select id, indicator_name, indicator_id, last_updated_date from edb_desc
where indicator_name in ( '{indicator_name}' )
''', conn)
if len(indicator_df)==0:
print_status(f'指标{indicator_name}没有待处理任务')
return
indicator_df['tdate'] = indicator_df['last_updated_date'].apply(lambda x: f"{(str(x))[0:4]}/{(str(x))[4:6]}/{(str(x))[6:8]}" if x else '2005/01/31')
indicator_df['tformula'] = indicator_df.apply(lambda row: f'=thsMEDB("{row["indicator_id"]}","{row["tdate"]}","","Format(isAsc=N,Display=R,FillBlank=B,DecimalPoint=2,LineBlank=N)")', axis=1)
print_status(f'正在处理{indicator_df["indicator_name"].iloc[0]}')
# 填充Excel数据
ws.range(0, 1, len(indicator_df)-1, 1).Formula = np.array(indicator_df['tformula']).reshape(-1, 1)
xlo.app().calculate(full=True, rebuild=True)



@xlo.func(command=True)
def save_one_edb():
'''
配合fetch_one_edb()使用,抓到后,全量更新至数据库
'''
ws = xlo.active_worksheet()
names = ws.range(0, 8, 50, 8).value
indicator_name = '' # 在I列的指标名
indicator_row = -1 # I列的行index
for i, name in enumerate(names):
name = name[0]
if name is None:
continue
indicator_name = name
indicator_row = i
break
if indicator_row == -1:
print_status('I列没有加入指标,请加指标')
return
conn = sqlite3.connect(SQLITE_FILE_PAHT)
indicator_df = pd.read_sql(f'''
select id, indicator_name, indicator_id, last_updated_date from edb_desc
where indicator_name in ( '{indicator_name}' )
''', conn)
if len(indicator_df)==0:
print_status(f'指标{indicator_name}不存在,请检查数据库')
return
indicator_foreign_id = indicator_df['id'].iloc[0] # 获得外键中的指标id


data = ws.range(0, 0, 5000, 1).value
i = 0
while data[i][0] is not None:
i += 1
data = data[:i]
if _is_fetching(data.flatten().tolist()):
MsgBox(f'指标{indicator_name}还未完全计算完毕!')
return
# 组织数据并插入DB
data_df = pd.DataFrame(data, columns=['indicator_date','value'])
data_df['desc_id'] = indicator_foreign_id
data_df = data_df[data_df['value']!=0] # TODO 把值为0的去掉,或许这个对某些指标来说不严谨。
data_df['indicator_date'] = data_df['indicator_date'].apply(lambda x: xlo.from_excel_date(x).strftime('%Y%m%d'))
# MsgBox(str(data_df))
# return
# 删DB数据
execute_sql(f'delete from edb_data where desc_id={data_df["desc_id"].iloc[0]}')
# 插入数据
data_df.to_sql('edb_data', conn, if_exists='append', index=False)
# 更新数据最新日期
execute_sql(f'''update indicator_description set last_updated_date='{data_df["indicator_date"].iloc[-1]}' where id={data_df["desc_id"].iloc[-1]}''')
# 清理Excel
ws.range(0, 0, 5000, 1).clear()
ws.cell(indicator_row, 8).value = ''

create_table()
Binary file modified backtest/load.xlsx
Binary file not shown.

0 comments on commit cd6c3ee

Please sign in to comment.