Skip to content

Commit

Permalink
Merge pull request #97 from initze/08_multisource_run
Browse files Browse the repository at this point in the history
08 multisource run
  • Loading branch information
initze authored Apr 15, 2024
2 parents a486f5f + 0227ffd commit 3bc16be
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_date_from_PSfilename(name):
return date


# TODO: create empty dataframe if no files found
def get_datasets(path, depth=0, preprocessed=False):
dirs = listdirs2(path, depth=depth)
df = pd.DataFrame(data=dirs, columns=['path'])
Expand Down Expand Up @@ -141,6 +142,7 @@ def get_processing_status(raw_data_dir, processing_dir, inference_dir, model):
except:
df_raw = get_datasets(raw_data_dir, depth=0)
# get processed
# TODO: check here for both options - make 2 runs
df_processed = get_datasets(processing_dir / 'tiles', depth=0, preprocessed=True)
# calculate prperties
diff = df_raw[~df_raw['name'].isin(df_processed['name'])]
Expand Down
126 changes: 126 additions & 0 deletions process_02_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@

from pathlib import Path
import torch
import pandas as pd
import os
import numpy as np
import tqdm
from joblib import delayed, Parallel
import shutil
from tqdm.notebook import tqdm
import swifter

from lib.postprocessing import *

# ### Settings

# Local code dir
CODE_DIR = Path('/isipd/projects/p_aicore_pf/initze/code/aicore_inference')
# Location of raw data
RAW_DATA_DIR = Path('/isipd/projects/p_aicore_pf/initze/data/planet/planet_data_inference_grid/tiles')
# Location data processing
PROCESSING_DIR = Path('/isipd/projects/p_aicore_pf/initze/processing')
# Target directory for
INFERENCE_DIR = Path('/isipd/projects/p_aicore_pf/initze/processed/inference')

# Target to models - RTS
MODEL_DIR = Path('/isipd/projects/p_aicore_pf/initze/models/thaw_slumps')

MODEL='RTS_v6_notcvis'
models = ['RTS_v6_notcvis', 'RTS_v6_tcvis']

#USE_GPU = [1,2,3,4]
USE_GPU = [1,2]
RUNS_PER_GPU = 5
MAX_IMAGES = None

# ### List all files with properties
df_processing_status = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, MODEL)

df_final = df_processing_status

total_images = len(df_final)
preprocessed_images = df_final.preprocessed.sum()
finished_images = df_final.inference_finished.sum()
print(f'Number of images: {total_images}')
print(f'Number of preprocessed images: {preprocessed_images}')
print(f'Number of finished images: {finished_images}')
print(f'Number of image to process: {preprocessed_images - finished_images}')

# ## Preprocessing

# #### Update Arctic DEM data
print('Updating Elevation VRTs!')
dem_data_dir = Path('/isipd/projects/p_aicore_pf/initze/data/ArcticDEM')
vrt_target_dir = Path('/isipd/projects/p_aicore_pf/initze/processing/auxiliary/ArcticDEM')
#update_DEM(vrt_target_dir)
update_DEM2(dem_data_dir=dem_data_dir, vrt_target_dir=vrt_target_dir)

# #### Copy data for Preprocessing
# make better documentation

df_preprocess = df_final[~df_final.preprocessed]
print(f'Number of images to preprocess: {len(df_preprocess)}')

# Cleanup processing directories to avoid incomplete processing
input_dir_dslist = list((PROCESSING_DIR / 'input').glob('*'))
if len(input_dir_dslist) > 0:
print(input_dir_dslist)
for d in input_dir_dslist:
print('Delete', d)
shutil.rmtree(d)
else:
print('Processing directory is ready, nothing to do!')

# Copy Data
_ = df_preprocess.swifter.apply(lambda x: copy_unprocessed_files(x, PROCESSING_DIR), axis=1)

# #### Run Preprocessing
import warnings
warnings.filterwarnings('ignore')

N_JOBS=40
print(f'Preprocessing {len(df_preprocess)} images') #fix this
if len(df_preprocess) > 0:
pp_string = f'python setup_raw_data.py --data_dir {PROCESSING_DIR} --n_jobs {N_JOBS} --nolabel'
os.system(pp_string)

# ## Processing/Inference
# rerun processing status
df_processing_status2 = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, MODEL)

# Filter to images that are not preprocessed yet
df_process = df_final[~df_final.inference_finished]
# update overview and filter accordingly - really necessary?
df_process_final = df_process.set_index('name').join(df_processing_status2[df_processing_status2['preprocessed']][['name']].set_index('name'), how='inner').reset_index(drop=False).iloc[:MAX_IMAGES]
# validate if images are correctly preprocessed
df_process_final['preprocessing_valid'] = (df_process_final.apply(lambda x: len(list(x['path'].glob('*'))), axis=1) >= 5)
# final filtering process to remove incorrectly preprocessed data
df_process_final = df_process_final[df_process_final['preprocessing_valid']]

print(f'Number of images:', len(df_process_final))

# #### Parallel runs
# Make splits to distribute the processing
n_splits = len(USE_GPU) * RUNS_PER_GPU
df_split = np.array_split(df_process_final, n_splits)
gpu_split = USE_GPU * RUNS_PER_GPU

#for split in df_split:
# print(f'Number of images: {len(split)}')

print('Run inference!')
# ### Parallel Inference execution
_ = Parallel(n_jobs=n_splits)(delayed(run_inference)(df_split[split], model=MODEL, processing_dir=PROCESSING_DIR, inference_dir=INFERENCE_DIR, model_dir=MODEL_DIR, gpu=gpu_split[split], run=True) for split in range(n_splits))
# #### Merge output files

# read all files which followiw the above defined threshold
flist = list((INFERENCE_DIR / MODEL).glob(f'*/*pred_binarized.shp'))
len(flist)
if len(df_process_final) > 0:
# load them in parallel
out = Parallel(n_jobs=6)(delayed(load_and_parse_vector)(f) for f in tqdm(flist[:]))
# merge them and save to geopackage file
merged_gdf = gpd.pd.concat(out)
print(INFERENCE_DIR / MODEL / f'{MODEL}_merged.gpkg')
merged_gdf.to_file(INFERENCE_DIR / MODEL / f'{MODEL}_merged.gpkg')
100 changes: 100 additions & 0 deletions process_03_ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# # Create ensemble results from several model outputs
# ### Imports

from pathlib import Path
import pandas as pd
from joblib import delayed, Parallel
#from tqdm.notebook import tqdm
from tqdm import tqdm
from lib.postprocessing import *
import geopandas as gpd

# ### Settings
# Local code dir
CODE_DIR = Path('.')
BASE_DIR = Path('../..')
# Location of raw data
# TODO: make support for multiple sources
RAW_DATA_DIR = BASE_DIR / Path('data/planet/planet_data_inference_grid/tiles')
# Location data processing
PROCESSING_DIR = BASE_DIR / 'processing'
# Target directory for
INFERENCE_DIR = BASE_DIR / Path('processed/inference')
# Target to models - RTS
MODEL_DIR = BASE_DIR / Path('models/thaw_slumps')
# Ensemble Target
ENSEMBLE_NAME = 'RTS_v6_ensemble_v2'
MODEL_NAMES = ['RTS_v6_notcvis', 'RTS_v6_tcvis']
N_IMAGES = None # automatically run full set
N_JOBS = 15 # number of cpu jobs for ensembling
N_VECTOR_LOADERS = 6 # number of parallel vector loaders for final merge

# ### Select Data
# * create list of available files
# * filter whats available

# check if cucim is available
try:
import cucim
try_gpu = True
print ('Running ensembling with GPU!')
except:
try_gpu = False
print ('Cucim import failed')

# setup all params
kwargs_ensemble = {
'ensemblename': ENSEMBLE_NAME,
'inference_dir': INFERENCE_DIR,
'modelnames': MODEL_NAMES,
'binary_threshold': [0.4, 0.45, 0.5],
'border_size': 10,
'minimum_mapping_unit': 32,
'delete_binary': True,
'try_gpu': False, # currently default to CPU only
'gpu' : 0,
}


# Check for finalized products
df_processing_status = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, model=kwargs_ensemble['ensemblename'])
df_ensemble_status = get_processing_status_ensemble(INFERENCE_DIR, model_input_names=kwargs_ensemble['modelnames'], model_ensemble_name=kwargs_ensemble['ensemblename'])
# Check which need to be process - check for already processed and invalid files
process = df_ensemble_status[df_ensemble_status['process']]

# #### Filter by tile_ids

#process = process[process.apply(lambda x: x['name'].split('_')[1].startswith('42'), axis=1)]
df_processing_status.groupby('inference_finished').count()

# #### Documentation

print('Number of files to process')
process.groupby('process').count().iloc[0,0]

# #### Run Ensemble Merging

print(f'Start running ensemble with {N_JOBS} jobs!')
print(f'Target ensemble name:', kwargs_ensemble['ensemblename'])
print(f'Source model output', kwargs_ensemble['modelnames'])
_ = Parallel(n_jobs=N_JOBS)(delayed(create_ensemble_v2)(image_id=process.iloc[row]['name'], **kwargs_ensemble) for row in tqdm(range(len(process.iloc[:N_IMAGES]))))

# #### run parallelized batch

# ### Merge vectors to complete dataset


ensemblename = ENSEMBLE_NAME
# set probability levels: 'class_05' means 50%, 'class_045' means 45%. This is the regex to search for vector naming
proba_strings = ['class_05', 'class_045','class_04']

for proba_string in proba_strings:
# read all files which followiw the above defined threshold
flist = list((INFERENCE_DIR / ensemblename).glob(f'*/*_{proba_string}.gpkg'))
len(flist)
# load them in parallel
out = Parallel(n_jobs=6)(delayed(load_and_parse_vector)(f) for f in tqdm(flist[:N_IMAGES]))
# merge them and save to geopackage file
print ('Merging results')
merged_gdf = gpd.pd.concat(out)
merged_gdf.to_file(INFERENCE_DIR / ensemblename / f'merged_{proba_string}.gpkg')

0 comments on commit 3bc16be

Please sign in to comment.