diff --git a/CHANGELOG.md b/CHANGELOG.md index 34a6392..986ed24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Now, the `array` class reports voxel size as a `float[3]` and `voxel_units` as a string (e.g. "nanometers") to match the input arguments to the `array` constructor (#83) - **Parallelism** - Fixes parallelism defaulting to n=1 (#70) + - This release adds support for parallel data uploads through `BossRemote#create_cutout`. - **CloudVolume** - Removes cloudvolume core dependency, and makes it an optional extra-install (#68) - **Fixes and Improvements** diff --git a/intern/remote/cv/remote.py b/intern/remote/cv/remote.py index 3c5471e..d509670 100644 --- a/intern/remote/cv/remote.py +++ b/intern/remote/cv/remote.py @@ -124,7 +124,7 @@ def create_cutout(self, resource, res, x_range, y_range, z_range, data): vol (CloudVolume) : Existing cloudvolume instance x_range (list) : x range within the 3D space y_range (list) : y range within the 3D space - z_range (list) : z range witinn the 3D space + z_range (list) : z range within the 3D space Retruns: message (str) : Uploading Data... message """ diff --git a/intern/remote/remote.py b/intern/remote/remote.py index fea125f..ced5e2d 100644 --- a/intern/remote/remote.py +++ b/intern/remote/remote.py @@ -175,10 +175,10 @@ def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range return self._volume.get_cutout( resource, resolution, x_range, y_range, z_range, time_range, - id_list, parallel = parallel, **kwargs + id_list, parallel=parallel, **kwargs ) - def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None): + def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None, parallel=True): """Upload a cutout to the volume service. Args: @@ -189,6 +189,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t z_range (list[int]): z range such as [10, 20] which means z>=10 and z<20. data (object): Type depends on implementation. time_range (optional [list[int]]): time range such as [30, 40] which means t>=30 and t<40. + parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism. Returns: (): Return type depends on volume service's implementation. @@ -200,7 +201,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t if not resource.valid_volume(): raise RuntimeError('Resource incompatible with the volume service.') return self._volume.create_cutout( - resource, resolution, x_range, y_range, z_range, data, time_range) + resource, resolution, x_range, y_range, z_range, data, time_range, parallel=parallel) def reserve_ids(self, resource, num_ids): """Reserve a block of unique, sequential ids for annotations. diff --git a/intern/service/boss/v1/volume.py b/intern/service/boss/v1/volume.py index 2dcb81d..f3560c6 100644 --- a/intern/service/boss/v1/volume.py +++ b/intern/service/boss/v1/volume.py @@ -49,7 +49,7 @@ def get_bit_width(self, resource): def create_cutout( self, resource, resolution, x_range, y_range, z_range, time_range, numpyVolume, - url_prefix, auth, session, send_opts): + url_prefix, auth, session, send_opts, parallel=True): """Upload a cutout to the Boss data store. Args: @@ -64,6 +64,7 @@ def create_cutout( auth (string): Token to send in the request header. session (requests.Session): HTTP session to use for request. send_opts (dictionary): Additional arguments to pass to session.send(). + parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism. """ if np.sum(numpyVolume) == 0: return @@ -98,8 +99,10 @@ def create_cutout( block_size=(1024, 1024, 32) ) - for b in blocks: - _data = np.ascontiguousarray( + def _create_new_contiguous_array(b, z_range, y_range, x_range): + # Create a new C ordered numpy array. Used below to construct + # each block. + return np.ascontiguousarray( numpyVolume[ b[2][0] - z_range[0]: b[2][1] - z_range[0], b[1][0] - y_range[0]: b[1][1] - y_range[0], @@ -107,11 +110,25 @@ def create_cutout( ], dtype=numpyVolume.dtype ) - self.create_cutout( - resource, resolution, b[0], b[1], b[2], - time_range, _data, url_prefix, auth, session, send_opts - ) - return + + if parallel: + pool = multiprocessing.Pool(processes=parallel if isinstance(parallel, int) and parallel > 0 else multiprocessing.cpu_count()) + pool.starmap(self.create_cutout, [ + ( + resource, resolution, b[0], b[1], b[2], + time_range, + _create_new_contiguous_array(b, z_range, y_range, x_range), + url_prefix, auth, session, send_opts, + ) + for b in blocks]) + else: + for b in blocks: + _data = _create_new_contiguous_array(b, z_range, y_range, x_range) + self.create_cutout( + resource, resolution, b[0], b[1], b[2], + time_range, _data, url_prefix, auth, session, send_opts + ) + return compressed = blosc.compress( numpyVolume, typesize=self.get_bit_width(resource) diff --git a/intern/service/boss/volume.py b/intern/service/boss/volume.py index 2e4af3c..63431af 100644 --- a/intern/service/boss/volume.py +++ b/intern/service/boss/volume.py @@ -62,7 +62,7 @@ def __init__(self, base_url, version): @check_channel def create_cutout( - self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None): + self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None, **kwargs): """Upload a cutout to the volume service. Args: @@ -78,7 +78,7 @@ def create_cutout( return self.service.create_cutout( resource, resolution, x_range, y_range, z_range, time_range, numpyVolume, - self.url_prefix, self.auth, self.session, self.session_send_opts) + self.url_prefix, self.auth, self.session, self.session_send_opts, **kwargs) @check_channel def create_cutout_to_black(