Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallelism to BossRemote#create_cutout #64

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Now, the `array` class reports voxel size as a `float[3]` and `voxel_units` as a string (e.g. "nanometers") to match the input arguments to the `array` constructor (#83)
- **Parallelism**
- Fixes parallelism defaulting to n=1 (#70)
- This release adds support for parallel data uploads through `BossRemote#create_cutout`.
- **CloudVolume**
- Removes cloudvolume core dependency, and makes it an optional extra-install (#68)
- **Fixes and Improvements**
Expand Down
2 changes: 1 addition & 1 deletion intern/remote/cv/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def create_cutout(self, resource, res, x_range, y_range, z_range, data):
vol (CloudVolume) : Existing cloudvolume instance
x_range (list) : x range within the 3D space
y_range (list) : y range within the 3D space
z_range (list) : z range witinn the 3D space
z_range (list) : z range within the 3D space
Retruns:
message (str) : Uploading Data... message
"""
Expand Down
7 changes: 4 additions & 3 deletions intern/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range
return self._volume.get_cutout(
resource, resolution,
x_range, y_range, z_range, time_range,
id_list, parallel = parallel, **kwargs
id_list, parallel=parallel, **kwargs
)

def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None):
def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None, parallel=True):
"""Upload a cutout to the volume service.

Args:
Expand All @@ -189,6 +189,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t
z_range (list[int]): z range such as [10, 20] which means z>=10 and z<20.
data (object): Type depends on implementation.
time_range (optional [list[int]]): time range such as [30, 40] which means t>=30 and t<40.
parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism.

Returns:
(): Return type depends on volume service's implementation.
Expand All @@ -200,7 +201,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t
if not resource.valid_volume():
raise RuntimeError('Resource incompatible with the volume service.')
return self._volume.create_cutout(
resource, resolution, x_range, y_range, z_range, data, time_range)
resource, resolution, x_range, y_range, z_range, data, time_range, parallel=parallel)

def reserve_ids(self, resource, num_ids):
"""Reserve a block of unique, sequential ids for annotations.
Expand Down
33 changes: 25 additions & 8 deletions intern/service/boss/v1/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_bit_width(self, resource):

def create_cutout(
self, resource, resolution, x_range, y_range, z_range, time_range, numpyVolume,
url_prefix, auth, session, send_opts):
url_prefix, auth, session, send_opts, parallel=True):
"""Upload a cutout to the Boss data store.

Args:
Expand All @@ -64,6 +64,7 @@ def create_cutout(
auth (string): Token to send in the request header.
session (requests.Session): HTTP session to use for request.
send_opts (dictionary): Additional arguments to pass to session.send().
parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism.
"""
if np.sum(numpyVolume) == 0:
return
Expand Down Expand Up @@ -98,20 +99,36 @@ def create_cutout(
block_size=(1024, 1024, 32)
)

for b in blocks:
_data = np.ascontiguousarray(
def _create_new_contiguous_array(b, z_range, y_range, x_range):
# Create a new C ordered numpy array. Used below to construct
# each block.
return np.ascontiguousarray(
numpyVolume[
b[2][0] - z_range[0]: b[2][1] - z_range[0],
b[1][0] - y_range[0]: b[1][1] - y_range[0],
b[0][0] - x_range[0]: b[0][1] - x_range[0]
],
dtype=numpyVolume.dtype
)
self.create_cutout(
resource, resolution, b[0], b[1], b[2],
time_range, _data, url_prefix, auth, session, send_opts
)
return

if parallel:
pool = multiprocessing.Pool(processes=parallel if isinstance(parallel, int) and parallel > 0 else multiprocessing.cpu_count())
pool.starmap(self.create_cutout, [
(
resource, resolution, b[0], b[1], b[2],
time_range,
_create_new_contiguous_array(b, z_range, y_range, x_range),
url_prefix, auth, session, send_opts,
)
for b in blocks])
else:
for b in blocks:
_data = _create_new_contiguous_array(b, z_range, y_range, x_range)
self.create_cutout(
resource, resolution, b[0], b[1], b[2],
time_range, _data, url_prefix, auth, session, send_opts
)
return

compressed = blosc.compress(
numpyVolume, typesize=self.get_bit_width(resource)
Expand Down
4 changes: 2 additions & 2 deletions intern/service/boss/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, base_url, version):

@check_channel
def create_cutout(
self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None):
self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None, **kwargs):
"""Upload a cutout to the volume service.

Args:
Expand All @@ -78,7 +78,7 @@ def create_cutout(

return self.service.create_cutout(
resource, resolution, x_range, y_range, z_range, time_range, numpyVolume,
self.url_prefix, self.auth, self.session, self.session_send_opts)
self.url_prefix, self.auth, self.session, self.session_send_opts, **kwargs)

@check_channel
def create_cutout_to_black(
Expand Down