-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathtraining_predictions.py
341 lines (279 loc) · 12.8 KB
/
training_predictions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import os
import pandas as pd
import numpy as np
import pandas_ta as ta
from sqlalchemy import create_engine, text, Column, Integer, Float,String, DateTime, MetaData, Table, update
from sqlalchemy.orm import sessionmaker
from catboost import CatBoostRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from tqdm import tqdm
from datetime import datetime, timezone
from dotenv import load_dotenv
load_dotenv()
# Get the directory of the current script
current_directory = os.path.dirname(os.path.abspath(__file__))
# Set the path to the instance folder
instance_folder = os.path.join(current_directory, 'instance')
# Construct the absolute path to the database file within the instance folder
db_file_path = os.path.join(instance_folder, os.getenv('SQLALCHEMY_DATABASE_URI').replace('sqlite:///', ''))
# Ensure the training folder exists
training_folder = os.getenv('TRAINING_FOLDER', 'training')
os.makedirs(training_folder, exist_ok=True)
# Ensure the predictions folder exists
predictions_folder = os.getenv('PREDICTIONS_FOLDER', 'predictions')
os.makedirs(predictions_folder, exist_ok=True)
# SQLAlchemy setup
engine = create_engine(f'sqlite:///{db_file_path}', echo=False)
Session = sessionmaker(bind=engine)
session = Session()
metadata = MetaData()
# Define the timestamps table
timestamps_table = Table('timestamps', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('symbol', String, nullable=False),
Column('timestamp', DateTime, nullable=False),
Column('operation', String, nullable=False) # download, training, or prediction
)
# Define the LTP table
ltp_table = Table('ltp', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('symbol', String, nullable=False, unique=True),
Column('ltp', Float, nullable=False)
)
# Create the tables if they don't exist
metadata.create_all(engine)
# Function to create features using pandas_ta
def create_features(df):
df['returns'] = df['close'].pct_change()
df['prev_day_returns'] = df['returns'].shift(1)
df['ema5'] = ta.ema(df['close'], length=5)
df['ema10'] = ta.ema(df['close'], length=10)
df['hl2'] = (df['high'] + df['low']) / 2
df['hlc3'] = (df['high'] + df['low'] + df['close']) / 3
df['rsi'] = ta.rsi(df['close'], length=14)
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
df.dropna(inplace=True)
return df
# Function to calculate RSI
def calculate_rsi(prices, period=14):
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum()/period
down = -seed[seed < 0].sum()/period
rs = up/down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100./(1. + rs)
for i in range(period, len(prices)):
delta = deltas[i - 1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period - 1) + upval) / period
down = (down * (period - 1) + downval) / period
rs = up/down
rsi[i] = 100. - 100./(1. + rs)
return rsi
# Function to calculate ATR
def calculate_atr(high, low, close, period=14):
tr1 = pd.DataFrame(high - low)
tr2 = pd.DataFrame(abs(high - close.shift(1)))
tr3 = pd.DataFrame(abs(low - close.shift(1)))
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, adjust=False).mean()
return atr
def upsert_timestamp(symbol, operation):
timestamp = datetime.now(timezone.utc)
stmt = (
update(timestamps_table)
.where(timestamps_table.c.symbol == symbol, timestamps_table.c.operation == operation)
.values(timestamp=timestamp)
)
result = session.execute(stmt)
if result.rowcount == 0: # If no row was updated, insert a new one
session.execute(
timestamps_table.insert().values(symbol=symbol, timestamp=timestamp, operation=operation)
)
session.commit()
def train_model(symbol):
# Load data from SQLite
query = f"SELECT * FROM finance_data WHERE symbol = '{symbol}' ORDER BY date"
df = pd.read_sql(query, engine)
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
# Create features
df = create_features(df)
# Prepare features and target
features = ['open', 'high', 'low', 'volume', 'returns', 'prev_day_returns', 'ema5', 'ema10', 'hl2', 'hlc3', 'rsi', 'atr']
X = df[features]
y = df['close']
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train CatBoost model
model = CatBoostRegressor(iterations=1000, learning_rate=0.05, depth=6, random_state=42)
model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=50, verbose=0)
# Save the model
model_path = os.path.join(training_folder, f'{symbol}.cbm')
model.save_model(model_path)
# Store the training timestamp
upsert_timestamp(symbol, 'training')
return model, X_test, y_test
def make_predictions(symbol, model, days=30):
# Load data from SQLite
query = f"SELECT * FROM finance_data WHERE symbol = '{symbol}' ORDER BY date"
df = pd.read_sql(query, engine)
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
# Create features
df = create_features(df)
# Prepare features
features = ['open', 'high', 'low', 'volume', 'returns', 'prev_day_returns', 'ema5', 'ema10', 'hl2', 'hlc3', 'rsi', 'atr']
X = df[features]
# Make predictions for the next 30 days
last_known_date = df.index[-1]
predictions = []
last_data = df.iloc[-1].copy()
prediction_data = df.iloc[-30:].copy() # Get last 30 days for rolling calculations
for i in range(days):
next_day_prediction = model.predict(last_data[features].values.reshape(1, -1))[0]
predictions.append(next_day_prediction)
# Create a new row for the predicted day
new_row = pd.DataFrame([{
'open': last_data['close'], # Assume next day's open is the same as previous close
'high': next_day_prediction * 1.01, # Assume 1% higher than predicted close
'low': next_day_prediction * 0.99, # Assume 1% lower than predicted close
'close': next_day_prediction,
'volume': last_data['volume'] # Keep the same volume (you might want to adjust this)
}])
# Add the new row to prediction_data
prediction_data = pd.concat([prediction_data, new_row], ignore_index=True)
# Recalculate features
prediction_data['returns'] = prediction_data['close'].pct_change()
prediction_data['prev_day_returns'] = prediction_data['returns'].shift(1)
prediction_data['ema5'] = ta.ema(prediction_data['close'], length=5)
prediction_data['ema10'] = ta.ema(prediction_data['close'], length=10)
prediction_data['hl2'] = (prediction_data['high'] + prediction_data['low']) / 2
prediction_data['hlc3'] = (prediction_data['high'] + prediction_data['low'] + prediction_data['close']) / 3
prediction_data['rsi'] = calculate_rsi(prediction_data['close'])
prediction_data['atr'] = calculate_atr(prediction_data['high'], prediction_data['low'], prediction_data['close'])
# Update last_data with the latest row
last_data = prediction_data.iloc[-1]
# Calculate metrics on test set
y_pred = model.predict(X[features])
mse = mean_squared_error(df['close'], y_pred)
r2 = r2_score(df['close'], y_pred)
# Create a DataFrame with dates and predictions
prediction_dates = pd.date_range(start=last_known_date + pd.Timedelta(days=1), periods=days)
predictions_df = pd.DataFrame({'date': prediction_dates, 'predicted_close': predictions})
predictions_df.set_index('date', inplace=True)
# Store the prediction timestamp
upsert_timestamp(symbol, 'prediction')
return predictions_df, mse, r2
def train_all_models():
# Get all unique symbols from the database
query = "SELECT DISTINCT symbol FROM finance_data"
symbols = pd.read_sql(query, engine)['symbol'].tolist()
# Train models for all symbols
for symbol in tqdm(symbols, desc="Training models"):
try:
train_model(symbol)
except Exception as e:
print(f"Error training model for {symbol}: {str(e)}")
def make_all_predictions():
# Get all unique symbols from the database
query = "SELECT DISTINCT symbol FROM finance_data"
symbols = pd.read_sql(query, engine)['symbol'].tolist()
# Make predictions for all symbols
results = {}
for symbol in tqdm(symbols, desc="Making predictions"):
try:
model, X_test, y_test = train_model(symbol)
predictions_df, mse, r2 = make_predictions(symbol, model)
results[symbol] = {
'predictions': predictions_df,
'mse': mse,
'r2': r2
}
except Exception as e:
print(f"Error making predictions for {symbol}: {str(e)}")
# Print results
for symbol, data in results.items():
print(f"\nSymbol: {symbol}")
print(f"Mean Squared Error: {data['mse']:.4f}")
print(f"R-squared Score: {data['r2']:.4f}")
print("Predictions for the next 30 days:")
print(data['predictions'])
# Save results to a CSV file
all_predictions = pd.DataFrame()
for symbol, data in results.items():
symbol_predictions = data['predictions'].copy()
symbol_predictions['symbol'] = symbol
all_predictions = pd.concat([all_predictions, symbol_predictions])
portfolio_predictions_path = os.path.join(predictions_folder, 'portfolio_30day_predictions.csv')
all_predictions.to_csv(portfolio_predictions_path)
print(f"Predictions saved to '{portfolio_predictions_path}'")
# Load the predictions
predictions_df = pd.read_csv(portfolio_predictions_path)
# Function to get the last known closing price for a symbol
def get_last_close(symbol):
query = text(f"SELECT close FROM finance_data WHERE symbol = :symbol ORDER BY date DESC LIMIT 1")
with engine.connect() as connection:
result = connection.execute(query, {"symbol": symbol}).fetchone()
return result[0] if result else None
# Create a new DataFrame to store the aggregate results
aggregate_results = []
# Process each unique symbol
for symbol in predictions_df['symbol'].unique():
symbol_predictions = predictions_df[predictions_df['symbol'] == symbol]
# Get the last predicted closing price (30th day)
closing_price_30day = symbol_predictions['predicted_close'].iloc[-1]
# Get the last known closing price
last_known_close = get_last_close(symbol)
if last_known_close is not None:
# Calculate 30-day returns
returns_30day = (closing_price_30day - last_known_close) / last_known_close * 100
else:
returns_30day = None
# Get the LTP
ltp = last_known_close if last_known_close is not None else 0
# Append to results
aggregate_results.append({
'Stock': symbol,
'30 day Closing': round(closing_price_30day, 2),
'30 day returns': round(returns_30day, 2) if returns_30day is not None else None,
'LTP': round(ltp, 2)
})
# Insert or update LTP table
stmt = (
update(ltp_table)
.where(ltp_table.c.symbol == symbol)
.values(ltp=ltp)
)
result = session.execute(stmt)
if result.rowcount == 0: # If no row was updated, insert a new one
session.execute(
ltp_table.insert().values(symbol=symbol, ltp=ltp)
)
session.commit()
# Create DataFrame from aggregate results
aggregate_df = pd.DataFrame(aggregate_results)
# Sort by 30-day returns in descending order
aggregate_df = aggregate_df.sort_values('30 day returns', ascending=False)
# Add ranking column
aggregate_df['Rank'] = range(1, len(aggregate_df) + 1)
aggregate_predictions_path = os.path.join(predictions_folder, 'aggregate_30day_predictions.csv')
aggregate_df.to_csv(aggregate_predictions_path, index=False)
print(f"Aggregate results saved to '{aggregate_predictions_path}'")
# Save to SQLite database
aggregate_df.to_sql('aggregate_predictions', engine, if_exists='replace', index=False)
print("Aggregate results saved to 'aggregate_predictions' table in the database")
# Optional: Query and display the table from the database to confirm
print("\nConfirming data in the database:")
query_result = pd.read_sql_query("SELECT * FROM aggregate_predictions LIMIT 10", engine)
print(query_result)
if __name__ == '__main__':
train_all_models()
make_all_predictions()