Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update validator to check file size before saving to disc. Ensure tha… #60

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions snp_oracle/predictionnet/utils/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ def _get_local_path(self, hotkey: str) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path

def _check_data_size(self, df: pd.DataFrame) -> Tuple[bool, float]:
"""
Check if DataFrame size is within allowed limits.

Args:
df (pd.DataFrame): DataFrame to check

Returns:
Tuple[bool, float]: (is_within_limit, size_in_mb)
"""
# Calculate size in memory
size_bytes = df.memory_usage(deep=True).sum()
size_mb = size_bytes / (1024 * 1024) # Convert to MB
max_size_mb = float(os.getenv("MAX_FILE_SIZE_MB", "10"))

return size_mb <= max_size_mb, size_mb

def store_local_data(
self,
timestamp: str,
Expand All @@ -78,7 +95,7 @@ def store_local_data(
metadata: Optional[Dict] = None,
) -> Tuple[bool, Dict]:
"""
Store data locally in Parquet format.
Store data locally in Parquet format with size validation.

Args:
timestamp (str): Current timestamp
Expand All @@ -94,6 +111,12 @@ def store_local_data(
if not isinstance(miner_data, pd.DataFrame) or miner_data.empty:
raise ValueError("miner_data must be a non-empty pandas DataFrame")

# Check data size before proceeding
is_size_ok, size_mb = self._check_data_size(miner_data)
if not is_size_ok:
max_size_mb = float(os.getenv("MAX_FILE_SIZE_MB", "10"))
return False, {"error": f"Data size ({size_mb:.2f}MB) exceeds maximum allowed size ({max_size_mb}MB)"}

# Get local storage path for this hotkey
local_path = self._get_local_path(hotkey)

Expand All @@ -109,6 +132,7 @@ def store_local_data(
"shape": f"{miner_data.shape[0]},{miner_data.shape[1]}",
"hotkey": hotkey,
"predictions": json.dumps(predictions) if predictions else "",
"size_mb": f"{size_mb:.2f}", # Add size information to metadata
}
if metadata:
full_metadata.update(metadata)
Expand All @@ -121,6 +145,7 @@ def store_local_data(
"local_path": str(file_path),
"rows": miner_data.shape[0],
"columns": miner_data.shape[1],
"size_mb": round(size_mb, 2),
}

except Exception as e:
Expand Down Expand Up @@ -283,7 +308,7 @@ async def store_data_async(
self.executor, lambda: self.store_local_data(timestamp, miner_data, predictions, hotkey, metadata)
)

def cleanup_local_storage(self, days_to_keep: int = 7):
def cleanup_local_storage(self, days_to_keep: int = 2):
"""Clean up old local storage directories"""
try:
dirs = sorted([d for d in self.local_storage.iterdir() if d.is_dir()])
Expand Down
2 changes: 1 addition & 1 deletion snp_oracle/predictionnet/utils/miner_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def upload_data(self, repo_id, data: pd.DataFrame, hotkey=None, encryption_key=N

# Create unique filename using timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
data_filename = f"data_{timestamp}.parquet.enc"
data_filename = "data.parquet.enc"
hotkey_path = f"{hotkey}/data"
data_full_path = f"{hotkey_path}/{data_filename}"

Expand Down
2 changes: 1 addition & 1 deletion snp_oracle/predictionnet/validator/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def handle_market_close(self, dataset_manager: DatasetManager) -> None:
"""Handle data management operations when market is closed."""
try:
# Clean up old data
dataset_manager.cleanup_local_storage(days_to_keep=7)
dataset_manager.cleanup_local_storage(days_to_keep=2)

# Upload today's data
success, result = dataset_manager.batch_upload_daily_data()
Expand Down
Loading