From 5d55e712054e8bf5b0f8d0e1f3b12872a0c06edb Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 15:58:19 +0200 Subject: [PATCH 01/26] Depend on donfig --- africanus/install/requirements.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/africanus/install/requirements.py b/africanus/install/requirements.py index eb8f77fc0..fedbe1ab5 100644 --- a/africanus/install/requirements.py +++ b/africanus/install/requirements.py @@ -13,7 +13,8 @@ # Basic requirements containing no C extensions. # This is necessary for building on RTD requirements = ['appdirs >= 1.4.3', - 'decorator'] + 'decorator', + 'donfig'] if not on_rtd: requirements += [ From 721add90457368549633a3bc98933617767a0b62 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 17:06:30 +0200 Subject: [PATCH 02/26] Add config object --- africanus/config.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 africanus/config.py diff --git a/africanus/config.py b/africanus/config.py new file mode 100644 index 000000000..c011c3afd --- /dev/null +++ b/africanus/config.py @@ -0,0 +1,3 @@ +from donfig import Config + +config = Config("codex-africanus") \ No newline at end of file From 3fb8fbc2c200ba8ddd0ae0e233fc87dc12f2e6ce Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 17:06:47 +0200 Subject: [PATCH 03/26] Simplify numba feed rotation --- africanus/rime/feeds.py | 101 ++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 60 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 8002071f5..88b02a5e0 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -6,70 +6,51 @@ import numpy as np +from africanus.config import config from africanus.util.docs import DocstringTemplate -from africanus.util.numba import jit - - -@jit(nopython=True, nogil=True, cache=True) -def _nb_feed_rotation(parallactic_angles, feed_type, feed_rotation): - shape = parallactic_angles.shape - parangles = parallactic_angles.flat - - # Linear feeds - if feed_type == 0: - for i, pa in enumerate(parangles): - pa_cos = np.cos(pa) - pa_sin = np.sin(pa) - - feed_rotation.real[i, 0, 0] = pa_cos - feed_rotation.imag[i, 0, 0] = 0.0 - feed_rotation.real[i, 0, 1] = pa_sin - feed_rotation.imag[i, 0, 1] = 0.0 - feed_rotation.real[i, 1, 0] = -pa_sin - feed_rotation.imag[i, 1, 0] = 0.0 - feed_rotation.real[i, 1, 1] = pa_cos - feed_rotation.imag[i, 1, 1] = 0.0 - - # Circular feeds - elif feed_type == 1: - for i, pa in enumerate(parangles): - pa_cos = np.cos(pa) - pa_sin = np.sin(pa) - - feed_rotation.real[i, 0, 0] = pa_cos - feed_rotation.imag[i, 0, 0] = -pa_sin - feed_rotation[i, 0, 1] = 0.0 + 0.0*1j - feed_rotation[i, 1, 0] = 0.0 + 0.0*1j - feed_rotation.real[i, 1, 1] = pa_cos - feed_rotation.imag[i, 1, 1] = pa_sin - else: - raise ValueError("Invalid feed_type") - - return feed_rotation.reshape(shape + (2, 2)) +from africanus.util.numba import generated_jit +@generated_jit(nopython=True, nogil=True, cache=True) def feed_rotation(parallactic_angles, feed_type='linear'): - if feed_type == 'linear': - poltype = 0 - elif feed_type == 'circular': - poltype = 1 - else: - raise ValueError("Invalid feed_type '%s'" % feed_type) - - if parallactic_angles.dtype == np.float32: - dtype = np.complex64 - elif parallactic_angles.dtype == np.float64: - dtype = np.complex128 - else: - raise ValueError("parallactic_angles has " - "none-floating point type %s" - % parallactic_angles.dtype) - - # Create result array with flattened parangles - shape = (reduce(mul, parallactic_angles.shape),) + (2, 2) - result = np.empty(shape, dtype=dtype) - - return _nb_feed_rotation(parallactic_angles, poltype, result) + pa_np_dtype = np.dtype(parallactic_angles.dtype.name) + dtype = np.result_type(pa_np_dtype, np.complex64) + + def impl(parallactic_angles, feed_type='linear'): + parangles = parallactic_angles.flat + + shape = (reduce(lambda x, y: x*y, parallactic_angles.shape, 1),) + result = np.zeros(shape + (2, 2), dtype=dtype) + + # Linear feeds + if feed_type == 'linear': + for i in range(shape[0]): + pa = parangles[i] + pa_cos = np.cos(pa) + pa_sin = np.sin(pa) + + result[i, 0, 0] = pa_cos + 0j + result[i, 0, 1] = pa_sin + 0j + result[i, 1, 0] = -pa_sin + 0j + result[i, 1, 1] = pa_cos + 0j + + # Circular feeds + elif feed_type == 'circular': + for i in range(shape[0]): + pa = parangles[i] + pa_cos = np.cos(pa) + pa_sin = np.sin(pa) + + result[i, 0, 0] = pa_cos - pa_sin*1j + result[i, 0, 1] = 0.0 + result[i, 1, 0] = 0.0 + result[i, 1, 1] = pa_cos + pa_sin*1j + else: + raise ValueError("feed_type not in ('linear', 'circular')") + + return result.reshape(parallactic_angles.shape + (2, 2)) + + return impl FEED_ROTATION_DOCS = DocstringTemplate(r""" From 240b447f00c9f26619c7ea43384ad166bae26ac9 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 17:45:32 +0200 Subject: [PATCH 04/26] parallel numba feed rotation setup --- africanus/config.py | 2 +- africanus/rime/feeds.py | 18 +++++++++++++++--- africanus/util/numba.py | 3 ++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/africanus/config.py b/africanus/config.py index c011c3afd..46aa8e79b 100644 --- a/africanus/config.py +++ b/africanus/config.py @@ -1,3 +1,3 @@ from donfig import Config -config = Config("codex-africanus") \ No newline at end of file +config = Config("africanus") \ No newline at end of file diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 88b02a5e0..cd4163d27 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -10,13 +10,22 @@ from africanus.util.docs import DocstringTemplate from africanus.util.numba import generated_jit +cfg = config.get("rime.feed_rotation.parallel", False) +parallel = cfg is not False +cfg = {} if cfg is True else cfg -@generated_jit(nopython=True, nogil=True, cache=True) +@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): pa_np_dtype = np.dtype(parallactic_angles.dtype.name) dtype = np.result_type(pa_np_dtype, np.complex64) + from numba import config as nbcfg, prange, set_num_threads + nthreads = cfg.get("threads", nbcfg.NUMBA_NUM_THREADS) if parallel else 1 + def impl(parallactic_angles, feed_type='linear'): + if parallel: + set_num_threads(nthreads) + parangles = parallactic_angles.flat shape = (reduce(lambda x, y: x*y, parallactic_angles.shape, 1),) @@ -24,7 +33,7 @@ def impl(parallactic_angles, feed_type='linear'): # Linear feeds if feed_type == 'linear': - for i in range(shape[0]): + for i in prange(shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -36,7 +45,7 @@ def impl(parallactic_angles, feed_type='linear'): # Circular feeds elif feed_type == 'circular': - for i in range(shape[0]): + for i in prange(shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -48,6 +57,9 @@ def impl(parallactic_angles, feed_type='linear'): else: raise ValueError("feed_type not in ('linear', 'circular')") + if parallel: + set_num_threads(max_threads) + return result.reshape(parallactic_angles.shape + (2, 2)) return impl diff --git a/africanus/util/numba.py b/africanus/util/numba.py index 02e1a27f5..88e2efd9d 100644 --- a/africanus/util/numba.py +++ b/africanus/util/numba.py @@ -22,9 +22,10 @@ def wrapper(*args, **kwargs): jit = _fake_decorator njit = _fake_decorator stencil = _fake_decorator + prange = _fake_decorator else: - from numba import cfunc, jit, njit, generated_jit, stencil # noqa + from numba import cfunc, jit, njit, generated_jit, stencil, prange # noqa def is_numba_type_none(arg): From 4116056eb857c4dc20769302cebde8349009c92e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 18:21:13 +0200 Subject: [PATCH 05/26] Rejigger the parallel feed rotation --- africanus/rime/feeds.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index cd4163d27..bbc03e6aa 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -14,26 +14,31 @@ parallel = cfg is not False cfg = {} if cfg is True else cfg + @generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): pa_np_dtype = np.dtype(parallactic_angles.dtype.name) dtype = np.result_type(pa_np_dtype, np.complex64) - from numba import config as nbcfg, prange, set_num_threads - nthreads = cfg.get("threads", nbcfg.NUMBA_NUM_THREADS) if parallel else 1 + import numba + nthreads = (cfg.get("threads", numba.config.NUMBA_NUM_THREADS) + if parallel else 1) def impl(parallactic_angles, feed_type='linear'): if parallel: - set_num_threads(nthreads) + numba.set_num_threads(nthreads) + + elements = numba.int64(1) - parangles = parallactic_angles.flat + for d in parallactic_angles.shape: + elements *= d - shape = (reduce(lambda x, y: x*y, parallactic_angles.shape, 1),) - result = np.zeros(shape + (2, 2), dtype=dtype) + parangles = parallactic_angles.ravel() + result = np.zeros((elements, 2, 2), dtype=dtype) # Linear feeds if feed_type == 'linear': - for i in prange(shape[0]): + for i in numba.prange(elements): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -45,7 +50,7 @@ def impl(parallactic_angles, feed_type='linear'): # Circular feeds elif feed_type == 'circular': - for i in prange(shape[0]): + for i in numba.prange(elements): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -58,7 +63,7 @@ def impl(parallactic_angles, feed_type='linear'): raise ValueError("feed_type not in ('linear', 'circular')") if parallel: - set_num_threads(max_threads) + numba.set_num_threads(numba.config.NUMBA_NUM_THREADS) return result.reshape(parallactic_angles.shape + (2, 2)) From 8651e89a3970fe890730e1996712836116e1f35a Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 18:22:03 +0200 Subject: [PATCH 06/26] flake8 --- africanus/config.py | 2 +- africanus/rime/feeds.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/africanus/config.py b/africanus/config.py index 46aa8e79b..a72963822 100644 --- a/africanus/config.py +++ b/africanus/config.py @@ -1,3 +1,3 @@ from donfig import Config -config = Config("africanus") \ No newline at end of file +config = Config("africanus") diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index bbc03e6aa..edd349cec 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -1,9 +1,5 @@ # -*- coding: utf-8 -*- - -from functools import reduce -from operator import mul - import numpy as np from africanus.config import config From f00e53d1208fb3ae9435d994f9038c8d388f9c88 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 7 May 2020 18:24:08 +0200 Subject: [PATCH 07/26] Simplify --- africanus/rime/feeds.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index edd349cec..a89ff292c 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -24,17 +24,12 @@ def impl(parallactic_angles, feed_type='linear'): if parallel: numba.set_num_threads(nthreads) - elements = numba.int64(1) - - for d in parallactic_angles.shape: - elements *= d - parangles = parallactic_angles.ravel() - result = np.zeros((elements, 2, 2), dtype=dtype) + result = np.zeros(parangles.shape + (2, 2), dtype=dtype) # Linear feeds if feed_type == 'linear': - for i in numba.prange(elements): + for i in numba.prange(parangles.shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -46,7 +41,7 @@ def impl(parallactic_angles, feed_type='linear'): # Circular feeds elif feed_type == 'circular': - for i in numba.prange(elements): + for i in numba.prange(parangles.shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) From 027cd679b5a80431bb27e02d177ceb9fefc3206b Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 8 May 2020 11:05:22 +0200 Subject: [PATCH 08/26] Inherit from Config to implement custom getter --- africanus/config.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/africanus/config.py b/africanus/config.py index a72963822..b28ef7010 100644 --- a/africanus/config.py +++ b/africanus/config.py @@ -1,3 +1,23 @@ from donfig import Config -config = Config("africanus") + +class AfricanusConfig(Config): + def numba_parallel(self, key): + value = self.get(key, False) + + if value is False: + return {'parallel': False} + elif value is True: + return {'parallel': True} + elif isinstance(value, dict): + value['parallel'] = True + return value + else: + raise TypeError("key %s (%s) is not a bool or a dict", + key, value) + + + + + +config = AfricanusConfig("africanus") From 60bda8997577b843c5f72268ecc7e54dd8efeaba Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 8 May 2020 11:17:14 +0200 Subject: [PATCH 09/26] updates and tests --- africanus/rime/feeds.py | 27 ++++++++++++------- africanus/rime/tests/test_rime.py | 44 ++++++++++++++++++++----------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index a89ff292c..66ad3b141 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -6,10 +6,8 @@ from africanus.util.docs import DocstringTemplate from africanus.util.numba import generated_jit -cfg = config.get("rime.feed_rotation.parallel", False) -parallel = cfg is not False -cfg = {} if cfg is True else cfg - +cfg = config.numba_parallel("rime.feed_rotation.parallel") +parallel = cfg.get('parallel', False) @generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): @@ -17,19 +15,28 @@ def feed_rotation(parallactic_angles, feed_type='linear'): dtype = np.result_type(pa_np_dtype, np.complex64) import numba - nthreads = (cfg.get("threads", numba.config.NUMBA_NUM_THREADS) - if parallel else 1) + nthreads = (1 if not parallel else + cfg.get("threads", numba.config.NUMBA_NUM_THREADS)) def impl(parallactic_angles, feed_type='linear'): if parallel: + prev_nthreads = numba.get_num_threads() numba.set_num_threads(nthreads) + # Can't use parangles.shape lower down + # until this is resolved + # https://github.com/numba/numba/issues/5439 + elements = 1 + + for d in parallactic_angles.shape: + elements *= d + parangles = parallactic_angles.ravel() - result = np.zeros(parangles.shape + (2, 2), dtype=dtype) + result = np.zeros((elements, 2, 2), dtype=dtype) # Linear feeds if feed_type == 'linear': - for i in numba.prange(parangles.shape[0]): + for i in numba.prange(elements): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -41,7 +48,7 @@ def impl(parallactic_angles, feed_type='linear'): # Circular feeds elif feed_type == 'circular': - for i in numba.prange(parangles.shape[0]): + for i in numba.prange(elements): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -54,7 +61,7 @@ def impl(parallactic_angles, feed_type='linear'): raise ValueError("feed_type not in ('linear', 'circular')") if parallel: - numba.set_num_threads(numba.config.NUMBA_NUM_THREADS) + numba.set_num_threads(prev_nthreads) return result.reshape(parallactic_angles.shape + (2, 2)) diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index eac29d4f8..099036181 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -3,6 +3,7 @@ """Tests for `codex-africanus` package.""" +import importlib import numpy as np import pytest @@ -50,23 +51,34 @@ def test_phase_delay(convention, sign): assert np.all(np.exp(1j*phase) == complex_phase[lm_i, uvw_i, freq_i]) -def test_feed_rotation(): - import numpy as np - from africanus.rime import feed_rotation +@pytest.mark.parametrize("parallel", [True, False]) +def test_feed_rotation(parallel): + import africanus + from africanus.config import config - parangles = np.random.random((10, 5)) - pa_sin = np.sin(parangles) - pa_cos = np.cos(parangles) - - fr = feed_rotation(parangles, feed_type='linear') - np_expr = np.stack([pa_cos, pa_sin, -pa_sin, pa_cos], axis=2) - assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) - - fr = feed_rotation(parangles, feed_type='circular') - zeros = np.zeros_like(pa_sin) - np_expr = np.stack([pa_cos - 1j*pa_sin, zeros, - zeros, pa_cos + 1j*pa_sin], axis=2) - assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) + with config.set({"rime.feed_rotation.parallel": parallel}): + importlib.reload(africanus.rime.feeds) + from africanus.rime.feeds import feed_rotation + + if parallel: + assert 'parallel' in feed_rotation.targetoptions + assert feed_rotation.targetoptions['parallel'] is True + else: + assert parallel not in feed_rotation.targetoptions + + parangles = np.random.random((10, 5)) + pa_sin = np.sin(parangles) + pa_cos = np.cos(parangles) + + fr = feed_rotation(parangles, feed_type='linear') + np_expr = np.stack([pa_cos, pa_sin, -pa_sin, pa_cos], axis=2) + assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) + + fr = feed_rotation(parangles, feed_type='circular') + zeros = np.zeros_like(pa_sin) + np_expr = np.stack([pa_cos - 1j*pa_sin, zeros, + zeros, pa_cos + 1j*pa_sin], axis=2) + assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) @pytest.mark.parametrize("convention, sign", [ From 9e9c323efbda4fff414199e6fd2c9920805dcd25 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 8 May 2020 11:17:55 +0200 Subject: [PATCH 10/26] flake8 --- africanus/config.py | 3 --- africanus/rime/feeds.py | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/africanus/config.py b/africanus/config.py index b28ef7010..729b20275 100644 --- a/africanus/config.py +++ b/africanus/config.py @@ -17,7 +17,4 @@ def numba_parallel(self, key): key, value) - - - config = AfricanusConfig("africanus") diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 66ad3b141..8a949c011 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -9,6 +9,7 @@ cfg = config.numba_parallel("rime.feed_rotation.parallel") parallel = cfg.get('parallel', False) + @generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): pa_np_dtype = np.dtype(parallactic_angles.dtype.name) From 2020e898a8907f7f9b4eeb7c4a4df768208fe20b Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 17:36:42 +0200 Subject: [PATCH 11/26] WIP --- africanus/rime/feeds.py | 16 ++++++---------- africanus/rime/predict.py | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 8a949c011..d0082a79e 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -24,20 +24,16 @@ def impl(parallactic_angles, feed_type='linear'): prev_nthreads = numba.get_num_threads() numba.set_num_threads(nthreads) - # Can't use parangles.shape lower down - # until this is resolved + parangles = parallactic_angles.ravel() + # Can't prepend shape tuple till the following is fixed # https://github.com/numba/numba/issues/5439 - elements = 1 - - for d in parallactic_angles.shape: - elements *= d + # We know parangles.ndim == 1 though + result = np.zeros((parangles.shape[0], 2, 2), dtype=dtype) - parangles = parallactic_angles.ravel() - result = np.zeros((elements, 2, 2), dtype=dtype) # Linear feeds if feed_type == 'linear': - for i in numba.prange(elements): + for i in numba.prange(parangles.shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) @@ -49,7 +45,7 @@ def impl(parallactic_angles, feed_type='linear'): # Circular feeds elif feed_type == 'circular': - for i in numba.prange(elements): + for i in numba.prange(parangles.shape[0]): pa = parangles[i] pa_cos = np.cos(pa) pa_sin = np.sin(pa) diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index c0f4fc2ec..929b3a555 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -3,9 +3,14 @@ import numpy as np +from africanus.config import config from africanus.util.docs import DocstringTemplate from africanus.util.numba import is_numba_type_none, generated_jit, njit +cfg = config.get("rime.predict_vis.parallel", False) +parallel = cfg is not False +cfg = {} if cfg is True else cfg + JONES_NOT_PRESENT = 0 JONES_1_OR_2 = 1 @@ -426,7 +431,7 @@ def predict_checks(time_index, antenna1, antenna2, have_dies1, have_bvis, have_dies2) -@generated_jit(nopython=True, nogil=True, cache=True) +@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def predict_vis(time_index, antenna1, antenna2, dde1_jones=None, source_coh=None, dde2_jones=None, die1_jones=None, base_vis=None, die2_jones=None): @@ -474,10 +479,16 @@ def predict_vis(time_index, antenna1, antenna2, apply_dies_fn = apply_dies_factory(have_dies, have_bvis, jones_type) add_coh_fn = add_coh_factory(have_bvis) + from numba import config as nbcfg, prange, set_num_threads + nthreads = cfg.get("threads", nbcfg.NUMBA_NUM_THREADS) if parallel else 1 + def _predict_vis_fn(time_index, antenna1, antenna2, dde1_jones=None, source_coh=None, dde2_jones=None, die1_jones=None, base_vis=None, die2_jones=None): + if parallel: + set_num_threads(nthreads) + # Get the output shape out = out_fn(time_index, dde1_jones, source_coh, dde2_jones, die1_jones, base_vis, die2_jones) @@ -498,6 +509,9 @@ def _predict_vis_fn(time_index, antenna1, antenna2, die1_jones, die2_jones, tmin, out) + if parallel: + set_num_threads(nbcfg.NUMBA_NUM_THREADS) + return out return _predict_vis_fn From 83c9c8548ea3885c2c5506c9d995e870ec5f403c Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 17:59:06 +0200 Subject: [PATCH 12/26] Fix feed rotation test case --- africanus/rime/tests/test_rime.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index 099036181..59961b7c4 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -60,11 +60,7 @@ def test_feed_rotation(parallel): importlib.reload(africanus.rime.feeds) from africanus.rime.feeds import feed_rotation - if parallel: - assert 'parallel' in feed_rotation.targetoptions - assert feed_rotation.targetoptions['parallel'] is True - else: - assert parallel not in feed_rotation.targetoptions + assert feed_rotation.targetoptions['parallel'] == parallel parangles = np.random.random((10, 5)) pa_sin = np.sin(parangles) From 74873a71299e7169e6bb81296a57c06f64afe7b7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 18:01:07 +0200 Subject: [PATCH 13/26] Parallel predict --- africanus/rime/predict.py | 36 ++++++++++------ africanus/rime/tests/test_predict.py | 64 ++++++++++++++++------------ 2 files changed, 59 insertions(+), 41 deletions(-) diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index 929b3a555..5f0b3c7f9 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -7,10 +7,9 @@ from africanus.util.docs import DocstringTemplate from africanus.util.numba import is_numba_type_none, generated_jit, njit -cfg = config.get("rime.predict_vis.parallel", False) -parallel = cfg is not False -cfg = {} if cfg is True else cfg - +cfg = config.numba_parallel("rime.predict_vis.parallel") +parallel = cfg.get('parallel', False) +axes = cfg.get("axes", set(('source','row')) if parallel else set()) JONES_NOT_PRESENT = 0 JONES_1_OR_2 = 1 @@ -194,10 +193,15 @@ def sum_coherencies_factory(have_ddes, have_coh, jones_type): """ Factory function generating a function that sums coherencies """ jones_mul = jones_mul_factory(have_ddes, have_coh, jones_type, True) + from numba import prange + + srange = prange if 'source' in axes else range + rrange = prange if 'row' in axes else range + if have_ddes and have_coh: def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): - for s in range(a1j.shape[0]): - for r in range(time.shape[0]): + for s in srange(a1j.shape[0]): + for r in rrange(time.shape[0]): ti = time[r] - tmin a1 = ant1[r] a2 = ant2[r] @@ -210,8 +214,8 @@ def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): elif have_ddes and not have_coh: def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): - for s in range(a1j.shape[0]): - for r in range(time.shape[0]): + for s in srange(a1j.shape[0]): + for r in rrange(time.shape[0]): ti = time[r] - tmin a1 = ant1[r] a2 = ant2[r] @@ -224,16 +228,16 @@ def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): elif not have_ddes and have_coh: if jones_type == JONES_2X2: def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): - for s in range(blj.shape[0]): - for r in range(blj.shape[1]): + for s in srange(blj.shape[0]): + for r in rrange(blj.shape[1]): for f in range(blj.shape[2]): for c1 in range(blj.shape[3]): for c2 in range(blj.shape[4]): out[r, f, c1, c2] += blj[s, r, f, c1, c2] else: def sum_coh_fn(time, ant1, ant2, a1j, blj, a2j, tmin, out): - for s in range(blj.shape[0]): - for r in range(blj.shape[1]): + for s in srange(blj.shape[0]): + for r in rrange(blj.shape[1]): for f in range(blj.shape[2]): for c in range(blj.shape[3]): out[r, f, c] += blj[s, r, f, c] @@ -303,16 +307,20 @@ def apply_dies_factory(have_dies, have_bvis, jones_type): Factory function returning a function that applies Direction Independent Effects """ + from numba import prange # We always "have visibilities", (the output array) jones_mul = jones_mul_factory(have_dies, True, jones_type, False) + # time is rowlike + trange = prange if 'row' in axes else range + if have_dies and have_bvis: def apply_dies(time, ant1, ant2, die1_jones, die2_jones, tmin, out): # Iterate over rows - for r in range(time.shape[0]): + for r in trange(time.shape[0]): ti = time[r] - tmin a1 = ant1[r] a2 = ant2[r] @@ -327,7 +335,7 @@ def apply_dies(time, ant1, ant2, die1_jones, die2_jones, tmin, out): # Iterate over rows - for r in range(time.shape[0]): + for r in trange(time.shape[0]): ti = time[r] - tmin a1 = ant1[r] a2 = ant2[r] diff --git a/africanus/rime/tests/test_predict.py b/africanus/rime/tests/test_predict.py index 041715162..55a9aded0 100644 --- a/africanus/rime/tests/test_predict.py +++ b/africanus/rime/tests/test_predict.py @@ -3,6 +3,8 @@ """Tests for `codex-africanus` package.""" +import importlib + import numpy as np from numpy.testing import assert_array_almost_equal @@ -52,11 +54,10 @@ def rc(*a, **kw): @dde_presence_parametrization @die_presence_parametrization @chunk_parametrization +@pytest.mark.parametrize("parallel", [True, False]) def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, a1j, blj, a2j, g1j, bvis, g2j, - chunks): - from africanus.rime.predict import predict_vis - + chunks, parallel): s = sum(chunks['source']) t = sum(chunks['time']) a = sum(chunks['antenna']) @@ -77,38 +78,47 @@ def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, assert ant1.size == r - model_vis = predict_vis(time_idx, ant1, ant2, - a1_jones if a1j else None, - bl_jones if blj else None, - a2_jones if a2j else None, - g1_jones if g1j else None, - base_vis if bvis else None, - g2_jones if g2j else None) + from africanus.config import config + + with config.set({"rime.predict_vis.parallel": parallel}): + import africanus + importlib.reload(africanus.rime.predict) + from africanus.rime.predict import predict_vis + + assert predict_vis.targetoptions['parallel'] == parallel + + model_vis = predict_vis(time_idx, ant1, ant2, + a1_jones if a1j else None, + bl_jones if blj else None, + a2_jones if a2j else None, + g1_jones if g1j else None, + base_vis if bvis else None, + g2_jones if g2j else None) - assert model_vis.shape == (r, c) + corr_shape + assert model_vis.shape == (r, c) + corr_shape - def _id(array): - return np.broadcast_to(idm, array.shape) + def _id(array): + return np.broadcast_to(idm, array.shape) - # For einsum, convert (time, ant) dimensions to row - # or ID matrices if the input is present - a1_jones = a1_jones[:, time_idx, ant1] if a1j else _id(bl_jones) - bl_jones = bl_jones if blj else _id(bl_jones) - a2_jones = a2_jones[:, time_idx, ant2].conj() if a2j else _id(bl_jones) + # For einsum, convert (time, ant) dimensions to row + # or ID matrices if the input is present + a1_jones = a1_jones[:, time_idx, ant1] if a1j else _id(bl_jones) + bl_jones = bl_jones if blj else _id(bl_jones) + a2_jones = a2_jones[:, time_idx, ant2].conj() if a2j else _id(bl_jones) - v = np.einsum(einsum_sig1, a1_jones, bl_jones, a2_jones) + v = np.einsum(einsum_sig1, a1_jones, bl_jones, a2_jones) - if bvis: - v += base_vis + if bvis: + v += base_vis - # Convert (time, ant) dimensions to row or - # or ID matrices if input is not present - g1_jones = g1_jones[time_idx, ant1] if g1j else _id(v) - g2_jones = g2_jones[time_idx, ant2].conj() if g2j else _id(v) + # Convert (time, ant) dimensions to row or + # or ID matrices if input is not present + g1_jones = g1_jones[time_idx, ant1] if g1j else _id(v) + g2_jones = g2_jones[time_idx, ant2].conj() if g2j else _id(v) - v = np.einsum(einsum_sig2, g1_jones, v, g2_jones) + v = np.einsum(einsum_sig2, g1_jones, v, g2_jones) - assert_array_almost_equal(v, model_vis) + assert_array_almost_equal(v, model_vis) @corr_shape_parametrization From 02ac415e8630de4ec848a127108f70d93b988e53 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 22:45:54 +0200 Subject: [PATCH 14/26] Improvements --- africanus/rime/phase.py | 18 +++++--- africanus/rime/tests/conftest.py | 21 +++++++++ africanus/rime/tests/test_predict.py | 66 ++++++++++++++-------------- africanus/rime/tests/test_rime.py | 55 +++++++++++------------ 4 files changed, 94 insertions(+), 66 deletions(-) diff --git a/africanus/rime/phase.py b/africanus/rime/phase.py index 48e9060ba..d68268aba 100644 --- a/africanus/rime/phase.py +++ b/africanus/rime/phase.py @@ -2,19 +2,28 @@ import numpy as np +from africanus.config import config from africanus.constants import minus_two_pi_over_c from africanus.util.docs import DocstringTemplate from africanus.util.numba import generated_jit from africanus.util.type_inference import infer_complex_dtype +cfg = config.numba_parallel("rime.phase_delay.parallel") +parallel = cfg.get('parallel', False) +axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=True) +@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def phase_delay(lm, uvw, frequency, convention='fourier'): # Bake constants in with the correct type one = lm.dtype(1.0) neg_two_pi_over_c = lm.dtype(minus_two_pi_over_c) out_dtype = infer_complex_dtype(lm, uvw, frequency) + from numba import prange + + srange = prange if 'source' in axes else range + rrange = prange if 'row' in axes else range + def _phase_delay_impl(lm, uvw, frequency, convention='fourier'): if convention == 'fourier': constant = neg_two_pi_over_c @@ -27,12 +36,12 @@ def _phase_delay_impl(lm, uvw, frequency, convention='fourier'): complex_phase = np.zeros(shape, dtype=out_dtype) # For each source - for source in range(lm.shape[0]): + for source in srange(lm.shape[0]): l, m = lm[source] n = np.sqrt(one - l**2 - m**2) - one # For each uvw coordinate - for row in range(uvw.shape[0]): + for row in rrange(uvw.shape[0]): u, v, w = uvw[row] # e^(-2*pi*(l*u + m*v + n*w)/c) real_phase = constant * (l * u + m * v + n * w) @@ -44,8 +53,7 @@ def _phase_delay_impl(lm, uvw, frequency, convention='fourier'): # Our phase input is purely imaginary # so we can can elide a call to exp # and just compute the cos and sin - complex_phase.real[source, row, chan] = np.cos(p) - complex_phase.imag[source, row, chan] = np.sin(p) + complex_phase[source, row, chan] = np.cos(p) + np.sin(p)*1j return complex_phase diff --git a/africanus/rime/tests/conftest.py b/africanus/rime/tests/conftest.py index 576635e7d..41d5225a9 100644 --- a/africanus/rime/tests/conftest.py +++ b/africanus/rime/tests/conftest.py @@ -3,11 +3,32 @@ """Tests for `codex-africanus` package.""" +import importlib import numpy as np import pytest +@pytest.fixture +def cfg_rime_parallel(request): + """ Performs parallel configuration setting and module reloading """ + from africanus.config import config + + module, cfg = request.param + + assert isinstance(cfg, dict) and len(cfg) == 1 + + # Get module object, because importlib.reload doesn't take strings + mod = importlib.import_module(module) + + with config.set(cfg): + importlib.reload(mod) + + yield cfg.copy().popitem()[1] + + importlib.reload(mod) + + @pytest.fixture def wsrt_ants(): """ Westerbork antenna positions """ diff --git a/africanus/rime/tests/test_predict.py b/africanus/rime/tests/test_predict.py index 55a9aded0..549912e82 100644 --- a/africanus/rime/tests/test_predict.py +++ b/africanus/rime/tests/test_predict.py @@ -54,10 +54,15 @@ def rc(*a, **kw): @dde_presence_parametrization @die_presence_parametrization @chunk_parametrization -@pytest.mark.parametrize("parallel", [True, False]) +@pytest.mark.parametrize("cfg_rime_parallel", [ + ("africanus.rime.predict", {"rime.predict_vis.parallel": True}), + ("africanus.rime.predict", {"rime.predict_vis.parallel": False}), + ], ids=["parallel", "serial"], indirect=True) def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, a1j, blj, a2j, g1j, bvis, g2j, - chunks, parallel): + chunks, cfg_rime_parallel): + from africanus.rime.predict import predict_vis + s = sum(chunks['source']) t = sum(chunks['time']) a = sum(chunks['antenna']) @@ -78,47 +83,40 @@ def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, assert ant1.size == r - from africanus.config import config - - with config.set({"rime.predict_vis.parallel": parallel}): - import africanus - importlib.reload(africanus.rime.predict) - from africanus.rime.predict import predict_vis - - assert predict_vis.targetoptions['parallel'] == parallel + assert predict_vis.targetoptions['parallel'] == cfg_rime_parallel - model_vis = predict_vis(time_idx, ant1, ant2, - a1_jones if a1j else None, - bl_jones if blj else None, - a2_jones if a2j else None, - g1_jones if g1j else None, - base_vis if bvis else None, - g2_jones if g2j else None) + model_vis = predict_vis(time_idx, ant1, ant2, + a1_jones if a1j else None, + bl_jones if blj else None, + a2_jones if a2j else None, + g1_jones if g1j else None, + base_vis if bvis else None, + g2_jones if g2j else None) - assert model_vis.shape == (r, c) + corr_shape + assert model_vis.shape == (r, c) + corr_shape - def _id(array): - return np.broadcast_to(idm, array.shape) + def _id(array): + return np.broadcast_to(idm, array.shape) - # For einsum, convert (time, ant) dimensions to row - # or ID matrices if the input is present - a1_jones = a1_jones[:, time_idx, ant1] if a1j else _id(bl_jones) - bl_jones = bl_jones if blj else _id(bl_jones) - a2_jones = a2_jones[:, time_idx, ant2].conj() if a2j else _id(bl_jones) + # For einsum, convert (time, ant) dimensions to row + # or ID matrices if the input is present + a1_jones = a1_jones[:, time_idx, ant1] if a1j else _id(bl_jones) + bl_jones = bl_jones if blj else _id(bl_jones) + a2_jones = a2_jones[:, time_idx, ant2].conj() if a2j else _id(bl_jones) - v = np.einsum(einsum_sig1, a1_jones, bl_jones, a2_jones) + v = np.einsum(einsum_sig1, a1_jones, bl_jones, a2_jones) - if bvis: - v += base_vis + if bvis: + v += base_vis - # Convert (time, ant) dimensions to row or - # or ID matrices if input is not present - g1_jones = g1_jones[time_idx, ant1] if g1j else _id(v) - g2_jones = g2_jones[time_idx, ant2].conj() if g2j else _id(v) + # Convert (time, ant) dimensions to row or + # or ID matrices if input is not present + g1_jones = g1_jones[time_idx, ant1] if g1j else _id(v) + g2_jones = g2_jones[time_idx, ant2].conj() if g2j else _id(v) - v = np.einsum(einsum_sig2, g1_jones, v, g2_jones) + v = np.einsum(einsum_sig2, g1_jones, v, g2_jones) - assert_array_almost_equal(v, model_vis) + assert_array_almost_equal(v, model_vis) @corr_shape_parametrization diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index 59961b7c4..659dbbe3c 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -3,12 +3,9 @@ """Tests for `codex-africanus` package.""" -import importlib import numpy as np - import pytest - def rf(*a, **kw): return np.random.random(*a, **kw) @@ -21,8 +18,14 @@ def rc(*a, **kw): ('fourier', 1), ('casa', -1) ]) -def test_phase_delay(convention, sign): - from africanus.rime import phase_delay +@pytest.mark.parametrize("cfg_rime_parallel", [ + ("africanus.rime.phase", {"rime.phase_delay.parallel": True}), + ("africanus.rime.phase", {"rime.phase_delay.parallel": False}), + ], ids=["parallel", "serial"], indirect=True) +def test_phase_delay(convention, sign, cfg_rime_parallel): + from africanus.rime.phase import phase_delay + + assert phase_delay.targetoptions['parallel'] == cfg_rime_parallel uvw = np.random.random(size=(100, 3)) lm = np.random.random(size=(10, 2)) @@ -51,30 +54,28 @@ def test_phase_delay(convention, sign): assert np.all(np.exp(1j*phase) == complex_phase[lm_i, uvw_i, freq_i]) -@pytest.mark.parametrize("parallel", [True, False]) -def test_feed_rotation(parallel): - import africanus - from africanus.config import config - - with config.set({"rime.feed_rotation.parallel": parallel}): - importlib.reload(africanus.rime.feeds) - from africanus.rime.feeds import feed_rotation +@pytest.mark.parametrize("cfg_rime_parallel", [ + ("africanus.rime.feeds", {"rime.feed_rotation.parallel": True}), + ("africanus.rime.feeds", {"rime.feed_rotation.parallel": False}), + ], ids=["parallel", "serial"], indirect=True) +def test_feed_rotation(cfg_rime_parallel): + from africanus.rime.feeds import feed_rotation - assert feed_rotation.targetoptions['parallel'] == parallel + assert feed_rotation.targetoptions['parallel'] == cfg_rime_parallel - parangles = np.random.random((10, 5)) - pa_sin = np.sin(parangles) - pa_cos = np.cos(parangles) - - fr = feed_rotation(parangles, feed_type='linear') - np_expr = np.stack([pa_cos, pa_sin, -pa_sin, pa_cos], axis=2) - assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) - - fr = feed_rotation(parangles, feed_type='circular') - zeros = np.zeros_like(pa_sin) - np_expr = np.stack([pa_cos - 1j*pa_sin, zeros, - zeros, pa_cos + 1j*pa_sin], axis=2) - assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) + parangles = np.random.random((10, 5)) + pa_sin = np.sin(parangles) + pa_cos = np.cos(parangles) + + fr = feed_rotation(parangles, feed_type='linear') + np_expr = np.stack([pa_cos, pa_sin, -pa_sin, pa_cos], axis=2) + assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) + + fr = feed_rotation(parangles, feed_type='circular') + zeros = np.zeros_like(pa_sin) + np_expr = np.stack([pa_cos - 1j*pa_sin, zeros, + zeros, pa_cos + 1j*pa_sin], axis=2) + assert np.allclose(fr, np_expr.reshape(10, 5, 2, 2)) @pytest.mark.parametrize("convention, sign", [ From 497668a38ce4ed10748fef4d5f3072005d84e89b Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 22:58:29 +0200 Subject: [PATCH 15/26] Only configure threads if requested --- africanus/rime/feeds.py | 13 ++++++------- africanus/rime/phase.py | 10 +++++++++- africanus/rime/predict.py | 13 +++++++------ africanus/rime/tests/conftest.py | 9 ++++++++- africanus/rime/tests/test_predict.py | 3 ++- africanus/rime/tests/test_rime.py | 3 ++- 6 files changed, 34 insertions(+), 17 deletions(-) diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index d0082a79e..4ef1d17f0 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -16,13 +16,12 @@ def feed_rotation(parallactic_angles, feed_type='linear'): dtype = np.result_type(pa_np_dtype, np.complex64) import numba - nthreads = (1 if not parallel else - cfg.get("threads", numba.config.NUMBA_NUM_THREADS)) + threads = cfg.get("threads", None) if parallel else None def impl(parallactic_angles, feed_type='linear'): - if parallel: - prev_nthreads = numba.get_num_threads() - numba.set_num_threads(nthreads) + if parallel and threads is not None: + prev_threads = numba.get_num_threads() + numba.set_num_threads(threads) parangles = parallactic_angles.ravel() # Can't prepend shape tuple till the following is fixed @@ -57,8 +56,8 @@ def impl(parallactic_angles, feed_type='linear'): else: raise ValueError("feed_type not in ('linear', 'circular')") - if parallel: - numba.set_num_threads(prev_nthreads) + if parallel and threads is not None: + numba.set_num_threads(prev_threads) return result.reshape(parallactic_angles.shape + (2, 2)) diff --git a/africanus/rime/phase.py b/africanus/rime/phase.py index d68268aba..44d25edcf 100644 --- a/africanus/rime/phase.py +++ b/africanus/rime/phase.py @@ -19,12 +19,17 @@ def phase_delay(lm, uvw, frequency, convention='fourier'): neg_two_pi_over_c = lm.dtype(minus_two_pi_over_c) out_dtype = infer_complex_dtype(lm, uvw, frequency) - from numba import prange + from numba import prange, set_num_threads, get_num_threads srange = prange if 'source' in axes else range rrange = prange if 'row' in axes else range + threads = cfg.get('threads', None) if parallel else None def _phase_delay_impl(lm, uvw, frequency, convention='fourier'): + if parallel and threads is not None: + prev_threads = get_num_threads() + set_num_threads(threads) + if convention == 'fourier': constant = neg_two_pi_over_c elif convention == 'casa': @@ -55,6 +60,9 @@ def _phase_delay_impl(lm, uvw, frequency, convention='fourier'): # and just compute the cos and sin complex_phase[source, row, chan] = np.cos(p) + np.sin(p)*1j + if parallel and threads is not None: + set_num_threads(prev_threads) + return complex_phase return _phase_delay_impl diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index 5f0b3c7f9..eafc03856 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -487,15 +487,16 @@ def predict_vis(time_index, antenna1, antenna2, apply_dies_fn = apply_dies_factory(have_dies, have_bvis, jones_type) add_coh_fn = add_coh_factory(have_bvis) - from numba import config as nbcfg, prange, set_num_threads - nthreads = cfg.get("threads", nbcfg.NUMBA_NUM_THREADS) if parallel else 1 + from numba import prange, set_num_threads, get_num_threads + threads = cfg.get("threads", None) if parallel else None def _predict_vis_fn(time_index, antenna1, antenna2, dde1_jones=None, source_coh=None, dde2_jones=None, die1_jones=None, base_vis=None, die2_jones=None): - if parallel: - set_num_threads(nthreads) + if parallel and threads is not None: + prev_threads = get_num_threads() + set_num_threads(threads) # Get the output shape out = out_fn(time_index, dde1_jones, source_coh, dde2_jones, @@ -517,8 +518,8 @@ def _predict_vis_fn(time_index, antenna1, antenna2, die1_jones, die2_jones, tmin, out) - if parallel: - set_num_threads(nbcfg.NUMBA_NUM_THREADS) + if parallel and threads is not None: + set_num_threads(prev_threads) return out diff --git a/africanus/rime/tests/conftest.py b/africanus/rime/tests/conftest.py index 41d5225a9..f44647d3a 100644 --- a/africanus/rime/tests/conftest.py +++ b/africanus/rime/tests/conftest.py @@ -24,7 +24,14 @@ def cfg_rime_parallel(request): with config.set(cfg): importlib.reload(mod) - yield cfg.copy().popitem()[1] + cfg = cfg.copy().popitem()[1] + + if isinstance(cfg, dict): + yield cfg['parallel'] + elif isinstance(cfg, bool): + yield cfg + else: + raise TypeError("Unhandled cfg type %s" % type(cfg)) importlib.reload(mod) diff --git a/africanus/rime/tests/test_predict.py b/africanus/rime/tests/test_predict.py index 549912e82..a2d795557 100644 --- a/africanus/rime/tests/test_predict.py +++ b/africanus/rime/tests/test_predict.py @@ -56,8 +56,9 @@ def rc(*a, **kw): @chunk_parametrization @pytest.mark.parametrize("cfg_rime_parallel", [ ("africanus.rime.predict", {"rime.predict_vis.parallel": True}), + ("africanus.rime.predict", {"rime.predict_vis.parallel": {'threads': 2}}), ("africanus.rime.predict", {"rime.predict_vis.parallel": False}), - ], ids=["parallel", "serial"], indirect=True) + ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, a1j, blj, a2j, g1j, bvis, g2j, chunks, cfg_rime_parallel): diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index 659dbbe3c..56eed9c72 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -20,8 +20,9 @@ def rc(*a, **kw): ]) @pytest.mark.parametrize("cfg_rime_parallel", [ ("africanus.rime.phase", {"rime.phase_delay.parallel": True}), + ("africanus.rime.phase", {"rime.phase_delay.parallel": {"threads": 2}}), ("africanus.rime.phase", {"rime.phase_delay.parallel": False}), - ], ids=["parallel", "serial"], indirect=True) + ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_phase_delay(convention, sign, cfg_rime_parallel): from africanus.rime.phase import phase_delay From 2b275e03e20bc784c75e5a3f3693543dd1cae9a3 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 12 May 2020 23:11:58 +0200 Subject: [PATCH 16/26] Beams --- africanus/rime/fast_beam_cubes.py | 287 +++++++++++++----------- africanus/rime/tests/test_fast_beams.py | 18 +- 2 files changed, 169 insertions(+), 136 deletions(-) diff --git a/africanus/rime/fast_beam_cubes.py b/africanus/rime/fast_beam_cubes.py index e3ceb6b3b..4fa59a051 100644 --- a/africanus/rime/fast_beam_cubes.py +++ b/africanus/rime/fast_beam_cubes.py @@ -3,8 +3,14 @@ from functools import reduce import numpy as np + +from africanus.config import config from africanus.util.docs import DocstringTemplate -from africanus.util.numba import njit +from africanus.util.numba import njit, generated_jit + +cfg = config.numba_parallel("rime.beam_cube_dde.parallel") +parallel = cfg.get('parallel', False) +axes = cfg.get("axes", set(('source','row')) if parallel else set()) @njit(nogil=True, cache=True) @@ -55,185 +61,204 @@ def freq_grid_interp(frequency, beam_freq_map): return freq_data -@njit(nogil=True, cache=True) +@generated_jit(nogil=True, cache=True, parallel=parallel) def beam_cube_dde(beam, beam_lm_extents, beam_freq_map, lm, parallactic_angles, point_errors, antenna_scaling, frequency): - nsrc = lm.shape[0] - ntime, nants = parallactic_angles.shape - nchan = frequency.shape[0] - beam_lw, beam_mh, beam_nud = beam.shape[:3] - corrs = beam.shape[3:] + from numba import prange, set_num_threads, get_num_threads + + srange = prange if 'source' in axes else range + rrange = prange if 'row' in axes else range + threads = cfg.get('threads', None) if parallel else None + + def impl(beam, beam_lm_extents, beam_freq_map, + lm, parallactic_angles, point_errors, antenna_scaling, + frequency): + + if parallel and threads is not None: + prev_threads = get_num_threads() + set_num_threads(threads) + + nsrc = lm.shape[0] + ntime, nants = parallactic_angles.shape + nchan = frequency.shape[0] + beam_lw, beam_mh, beam_nud = beam.shape[:3] + corrs = beam.shape[3:] + + if beam_lw < 2 or beam_mh < 2 or beam_nud < 2: + raise ValueError("beam_lw, beam_mh and beam_nud " + "must be >= 2") - if beam_lw < 2 or beam_mh < 2 or beam_nud < 2: - raise ValueError("beam_lw, beam_mh and beam_nud must be >= 2") + # Flatten correlations + ncorrs = reduce(lambda x, y: x*y, corrs, 1) - # Flatten correlations - ncorrs = reduce(lambda x, y: x*y, corrs, 1) + lower_l, upper_l = beam_lm_extents[0] + lower_m, upper_m = beam_lm_extents[1] - lower_l, upper_l = beam_lm_extents[0] - lower_m, upper_m = beam_lm_extents[1] + ex_dtype = beam_lm_extents.dtype - ex_dtype = beam_lm_extents.dtype + # Maximum l and m indices in float and int + lmaxf = ex_dtype.type(beam_lw - 1) + mmaxf = ex_dtype.type(beam_mh - 1) + lmaxi = beam_lw - 1 + mmaxi = beam_mh - 1 - # Maximum l and m indices in float and int - lmaxf = ex_dtype.type(beam_lw - 1) - mmaxf = ex_dtype.type(beam_mh - 1) - lmaxi = beam_lw - 1 - mmaxi = beam_mh - 1 + lscale = lmaxf / (upper_l - lower_l) + mscale = mmaxf / (upper_m - lower_m) - lscale = lmaxf / (upper_l - lower_l) - mscale = mmaxf / (upper_m - lower_m) + one = ex_dtype.type(1) + zero = ex_dtype.type(0) - one = ex_dtype.type(1) - zero = ex_dtype.type(0) + # Flatten the beam on correlation + fbeam = beam.reshape((beam_lw, beam_mh, beam_nud, ncorrs)) - # Flatten the beam on correlation - fbeam = beam.reshape((beam_lw, beam_mh, beam_nud, ncorrs)) + # Allocate output array with correlations flattened + fjones = np.empty((nsrc, ntime, nants, nchan, ncorrs), dtype=beam.dtype) - # Allocate output array with correlations flattened - fjones = np.empty((nsrc, ntime, nants, nchan, ncorrs), dtype=beam.dtype) + # Compute frequency interpolation stuff + freq_data = freq_grid_interp(frequency, beam_freq_map) - # Compute frequency interpolation stuff - freq_data = freq_grid_interp(frequency, beam_freq_map) + corr_sum = np.zeros((ncorrs,), dtype=beam.dtype) + absc_sum = np.zeros((ncorrs,), dtype=beam.real.dtype) + beam_scratch = np.zeros((ncorrs,), dtype=beam.dtype) - corr_sum = np.zeros((ncorrs,), dtype=beam.dtype) - absc_sum = np.zeros((ncorrs,), dtype=beam.real.dtype) - beam_scratch = np.zeros((ncorrs,), dtype=beam.dtype) + for t in rrange(ntime): + for a in rrange(nants): + sin_pa = np.sin(parallactic_angles[t, a]) + cos_pa = np.cos(parallactic_angles[t, a]) - for t in range(ntime): - for a in range(nants): - sin_pa = np.sin(parallactic_angles[t, a]) - cos_pa = np.cos(parallactic_angles[t, a]) + for s in srange(nsrc): + # Extract lm coordinates + l, m = lm[s] - for s in range(nsrc): - # Extract lm coordinates - l, m = lm[s] + for f in range(nchan): + # Unpack frequency data + freq_scale = freq_data[f, 0] + # lower and upper frequency weights + nud = freq_data[f, 1] + inv_nud = 1.0 - nud + # lower and upper frequency grid position + gc0 = np.int32(freq_data[f, 2]) + gc1 = gc0 + 1 - for f in range(nchan): - # Unpack frequency data - freq_scale = freq_data[f, 0] - # lower and upper frequency weights - nud = freq_data[f, 1] - inv_nud = 1.0 - nud - # lower and upper frequency grid position - gc0 = np.int32(freq_data[f, 2]) - gc1 = gc0 + 1 + # Apply any frequency scaling + sl = l * freq_scale + sm = m * freq_scale - # Apply any frequency scaling - sl = l * freq_scale - sm = m * freq_scale + # Add pointing errors + tl = sl + point_errors[t, a, f, 0] + tm = sm + point_errors[t, a, f, 1] - # Add pointing errors - tl = sl + point_errors[t, a, f, 0] - tm = sm + point_errors[t, a, f, 1] + # Rotate lm coordinate angle + vl = tl*cos_pa - tm*sin_pa + vm = tl*sin_pa + tm*cos_pa - # Rotate lm coordinate angle - vl = tl*cos_pa - tm*sin_pa - vm = tl*sin_pa + tm*cos_pa + # Scale by antenna scaling + vl *= antenna_scaling[a, f, 0] + vm *= antenna_scaling[a, f, 1] - # Scale by antenna scaling - vl *= antenna_scaling[a, f, 0] - vm *= antenna_scaling[a, f, 1] + # Shift into the cube coordinate system + vl = lscale*(vl - lower_l) + vm = mscale*(vm - lower_m) - # Shift into the cube coordinate system - vl = lscale*(vl - lower_l) - vm = mscale*(vm - lower_m) + # Clamp the coordinates to the edges of the cube + vl = max(zero, min(vl, lmaxf)) + vm = max(zero, min(vm, mmaxf)) - # Clamp the coordinates to the edges of the cube - vl = max(zero, min(vl, lmaxf)) - vm = max(zero, min(vm, mmaxf)) + # Snap to the lower grid coordinates + gl0 = np.int32(np.floor(vl)) + gm0 = np.int32(np.floor(vm)) - # Snap to the lower grid coordinates - gl0 = np.int32(np.floor(vl)) - gm0 = np.int32(np.floor(vm)) + # Snap to the upper grid coordinates + gl1 = min(gl0 + 1, lmaxi) + gm1 = min(gm0 + 1, mmaxi) - # Snap to the upper grid coordinates - gl1 = min(gl0 + 1, lmaxi) - gm1 = min(gm0 + 1, mmaxi) + # Difference between grid and offset coordinates + ld = vl - gl0 + md = vm - gm0 - # Difference between grid and offset coordinates - ld = vl - gl0 - md = vm - gm0 + # Zero accumulation arrays + corr_sum[:] = 0 + absc_sum[:] = 0 - # Zero accumulation arrays - corr_sum[:] = 0 - absc_sum[:] = 0 + # Accumulate lower cube correlations + beam_scratch[:] = fbeam[gl0, gm0, gc0, :] + weight = (one - ld)*(one - md)*nud - # Accumulate lower cube correlations - beam_scratch[:] = fbeam[gl0, gm0, gc0, :] - weight = (one - ld)*(one - md)*nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl1, gm0, gc0, :] + weight = ld*(one - md)*nud - beam_scratch[:] = fbeam[gl1, gm0, gc0, :] - weight = ld*(one - md)*nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl0, gm1, gc0, :] + weight = (one - ld)*md*nud - beam_scratch[:] = fbeam[gl0, gm1, gc0, :] - weight = (one - ld)*md*nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl1, gm1, gc0, :] + weight = ld*md*nud - beam_scratch[:] = fbeam[gl1, gm1, gc0, :] - weight = ld*md*nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + # Accumulate upper cube correlations + beam_scratch[:] = fbeam[gl0, gm0, gc1, :] + weight = (one - ld)*(one - md)*inv_nud - # Accumulate upper cube correlations - beam_scratch[:] = fbeam[gl0, gm0, gc1, :] - weight = (one - ld)*(one - md)*inv_nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl1, gm0, gc1, :] + weight = ld*(one - md)*inv_nud - beam_scratch[:] = fbeam[gl1, gm0, gc1, :] - weight = ld*(one - md)*inv_nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl0, gm1, gc1, :] + weight = (one - ld)*md*inv_nud - beam_scratch[:] = fbeam[gl0, gm1, gc1, :] - weight = (one - ld)*md*inv_nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + beam_scratch[:] = fbeam[gl1, gm1, gc1, :] + weight = ld*md*inv_nud - beam_scratch[:] = fbeam[gl1, gm1, gc1, :] - weight = ld*md*inv_nud + for c in range(ncorrs): + absc_sum[c] += weight * np.abs(beam_scratch[c]) + corr_sum[c] += weight * beam_scratch[c] - for c in range(ncorrs): - absc_sum[c] += weight * np.abs(beam_scratch[c]) - corr_sum[c] += weight * beam_scratch[c] + for c in range(ncorrs): + # Added all correlations, normalise + div = np.abs(corr_sum[c]) - for c in range(ncorrs): - # Added all correlations, normalise - div = np.abs(corr_sum[c]) + if div == 0.0: + # This case probably works out to a zero assign + corr_sum[c] *= absc_sum[c] + else: + corr_sum[c] *= absc_sum[c] / div - if div == 0.0: - # This case probably works out to a zero assign - corr_sum[c] *= absc_sum[c] - else: - corr_sum[c] *= absc_sum[c] / div + # Assign normalised values + fjones[s, t, a, f, :] = corr_sum - # Assign normalised values - fjones[s, t, a, f, :] = corr_sum + if parallel and threads is not None: + set_num_threads(prev_threads) - return fjones.reshape((nsrc, ntime, nants, nchan) + corrs) + return fjones.reshape((nsrc, ntime, nants, nchan) + corrs) + return impl BEAM_CUBE_DOCS = DocstringTemplate( r""" diff --git a/africanus/rime/tests/test_fast_beams.py b/africanus/rime/tests/test_fast_beams.py index 4645bebf0..ae01ff365 100644 --- a/africanus/rime/tests/test_fast_beams.py +++ b/africanus/rime/tests/test_fast_beams.py @@ -3,9 +3,6 @@ import pytest -from africanus.rime.fast_beam_cubes import beam_cube_dde, freq_grid_interp - - def rf(*a, **kw): return np.random.random(*a, **kw) @@ -39,11 +36,19 @@ def freqs(): return np.array([.4, .5, .6, .7, .8, .9, 1.0, 1.1]) - -def test_fast_beam_small(): +@pytest.mark.parametrize("cfg_rime_parallel", [ + ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": True}), + ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": {'threads': 2}}), + ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": False}), + ], ids=["parallel", "parallel-2", "serial"], indirect=True) +def test_fast_beam_small(cfg_rime_parallel): """ Small beam test, interpolation of one soure at [0.1, 0.1] """ + from africanus.rime.fast_beam_cubes import beam_cube_dde + assert beam_cube_dde.targetoptions['parallel'] == cfg_rime_parallel + np.random.seed(42) + # One frequency, to the lower side of the beam frequency map freq = np.asarray([.3]) beam_freq_map = np.asarray([0.0, 1.0]) @@ -121,6 +126,8 @@ def test_fast_beam_small(): def test_grid_interpolate(freqs, beam_freq_map): + from africanus.rime.fast_beam_cubes import freq_grid_interp + freq_data = freq_grid_interp(freqs, beam_freq_map) freq_scale = freq_data[:, 0] @@ -153,6 +160,7 @@ def test_grid_interpolate(freqs, beam_freq_map): def test_dask_fast_beams(freqs, beam_freq_map): da = pytest.importorskip("dask.array") + from africanus.rime.fast_beam_cubes import beam_cube_dde from africanus.rime.dask import beam_cube_dde as dask_beam_cube_dde beam_lw = 10 From a60a65d01424e29912a69c5e125a1aee3cb7cc18 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 09:19:06 +0200 Subject: [PATCH 17/26] Move fixture --- africanus/conftest.py | 30 +++++++++++++++++++ africanus/model/spectral/spec_model.py | 18 ++++++++++- .../spectral/tests/test_spectral_model.py | 18 ++++++++--- africanus/rime/tests/test_fast_beams.py | 6 ++-- africanus/rime/tests/test_predict.py | 6 ++-- africanus/rime/tests/test_rime.py | 12 ++++---- 6 files changed, 73 insertions(+), 17 deletions(-) diff --git a/africanus/conftest.py b/africanus/conftest.py index f04d969f5..a69770a9c 100644 --- a/africanus/conftest.py +++ b/africanus/conftest.py @@ -1,9 +1,39 @@ # -*- coding: utf-8 -*- +import importlib +import pytest from africanus.util.testing import mark_in_pytest +@pytest.fixture +def cfg_parallel(request): + """ Performs parallel configuration setting and module reloading """ + from africanus.config import config + + module, cfg = request.param + + assert isinstance(cfg, dict) and len(cfg) == 1 + + # Get module object, because importlib.reload doesn't take strings + mod = importlib.import_module(module) + + with config.set(cfg): + importlib.reload(mod) + + cfg = cfg.copy().popitem()[1] + + if isinstance(cfg, dict): + yield cfg['parallel'] + elif isinstance(cfg, bool): + yield cfg + else: + raise TypeError("Unhandled cfg type %s" % type(cfg)) + + importlib.reload(mod) + + + # content of conftest.py def pytest_configure(config): mark_in_pytest(True) diff --git a/africanus/model/spectral/spec_model.py b/africanus/model/spectral/spec_model.py index bdd3f1bb5..1af8c9aec 100644 --- a/africanus/model/spectral/spec_model.py +++ b/africanus/model/spectral/spec_model.py @@ -4,10 +4,15 @@ from numba import types import numpy as np +from africanus.config import config from africanus.util.numba import generated_jit, njit from africanus.util.docs import DocstringTemplate +cfg = config.numba_parallel('model.spectral.parallel') +parallel = cfg.get('parallel', False) +axes = cfg.get('axes', ('source', 'chan') if parallel else ()) + def numpy_spectral_model(stokes, spi, ref_freq, frequency, base): out_shape = (stokes.shape[0], frequency.shape[0]) + stokes.shape[1:] @@ -92,7 +97,7 @@ def impl(array): return njit(nogil=True, cache=True)(impl) -@generated_jit(nopython=True, nogil=True, cache=True) +@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def spectral_model(stokes, spi, ref_freq, frequency, base=0): arg_dtypes = tuple(np.dtype(a.dtype.name) for a in (stokes, spi, ref_freq, frequency)) @@ -139,7 +144,14 @@ def is_log10(base): if spi.ndim - 2 != npoldims: raise ValueError("Dimensions on stokes and spi don't agree") + from numba import prange, set_num_threads, get_num_threads + threads = cfg.get('threads', None) if parallel else None + def impl(stokes, spi, ref_freq, frequency, base=0): + if parallel and threads is not None: + prev_threads = get_num_threads() + set_num_threads(threads) + nsrc = stokes.shape[0] nchan = frequency.shape[0] nspi = spi.shape[1] @@ -206,7 +218,11 @@ def impl(stokes, spi, ref_freq, frequency, base=0): else: raise ValueError("Invalid base") + if parallel and threads is not None: + set_num_threads(prev_threads) + out_shape = (stokes.shape[0], frequency.shape[0]) + stokes.shape[1:] + return spectral_model.reshape(out_shape) return impl diff --git a/africanus/model/spectral/tests/test_spectral_model.py b/africanus/model/spectral/tests/test_spectral_model.py index d46ee3430..5b192c348 100644 --- a/africanus/model/spectral/tests/test_spectral_model.py +++ b/africanus/model/spectral/tests/test_spectral_model.py @@ -5,9 +5,6 @@ from numpy.testing import assert_array_almost_equal import pytest -from africanus.model.spectral.spec_model import (spectral_model, - numpy_spectral_model) - @pytest.fixture def flux(): @@ -44,7 +41,20 @@ def impl(shape): @pytest.mark.parametrize("base", [0, 1, 2, "std", "log", "log10", ["log", "std", "std", "std"]]) @pytest.mark.parametrize("npol", [0, 1, 2, 4]) -def test_spectral_model_multiple_spi(flux, ref_freq, frequency, base, npol): +@pytest.mark.parametrize("cfg_parallel", [ + ("africanus.model.spectral.spec_model", {"model.spectral.parallel": True}), + ("africanus.model.spectral.spec_model", {"model.spectral.parallel": {'threads': 2}}), + ("africanus.model.spectral.spec_model", {"model.spectral.parallel": False}), + ], ids=["parallel", "parallel-2", "serial"], indirect=True) +def test_spectral_model_multiple_spi(flux, ref_freq, frequency, + base, npol, cfg_parallel): + + from africanus.model.spectral.spec_model import (spectral_model, + numpy_spectral_model) + + assert spectral_model.targetoptions['parallel'] == cfg_parallel + + nsrc = 10 nchan = 16 nspi = 6 diff --git a/africanus/rime/tests/test_fast_beams.py b/africanus/rime/tests/test_fast_beams.py index ae01ff365..2bc6d54e1 100644 --- a/africanus/rime/tests/test_fast_beams.py +++ b/africanus/rime/tests/test_fast_beams.py @@ -36,15 +36,15 @@ def freqs(): return np.array([.4, .5, .6, .7, .8, .9, 1.0, 1.1]) -@pytest.mark.parametrize("cfg_rime_parallel", [ +@pytest.mark.parametrize("cfg_parallel", [ ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": True}), ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": {'threads': 2}}), ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) -def test_fast_beam_small(cfg_rime_parallel): +def test_fast_beam_small(cfg_parallel): """ Small beam test, interpolation of one soure at [0.1, 0.1] """ from africanus.rime.fast_beam_cubes import beam_cube_dde - assert beam_cube_dde.targetoptions['parallel'] == cfg_rime_parallel + assert beam_cube_dde.targetoptions['parallel'] == cfg_parallel np.random.seed(42) diff --git a/africanus/rime/tests/test_predict.py b/africanus/rime/tests/test_predict.py index a2d795557..52e56512d 100644 --- a/africanus/rime/tests/test_predict.py +++ b/africanus/rime/tests/test_predict.py @@ -54,14 +54,14 @@ def rc(*a, **kw): @dde_presence_parametrization @die_presence_parametrization @chunk_parametrization -@pytest.mark.parametrize("cfg_rime_parallel", [ +@pytest.mark.parametrize("cfg_parallel", [ ("africanus.rime.predict", {"rime.predict_vis.parallel": True}), ("africanus.rime.predict", {"rime.predict_vis.parallel": {'threads': 2}}), ("africanus.rime.predict", {"rime.predict_vis.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, a1j, blj, a2j, g1j, bvis, g2j, - chunks, cfg_rime_parallel): + chunks, cfg_parallel): from africanus.rime.predict import predict_vis s = sum(chunks['source']) @@ -84,7 +84,7 @@ def test_predict_vis(corr_shape, idm, einsum_sig1, einsum_sig2, assert ant1.size == r - assert predict_vis.targetoptions['parallel'] == cfg_rime_parallel + assert predict_vis.targetoptions['parallel'] == cfg_parallel model_vis = predict_vis(time_idx, ant1, ant2, a1_jones if a1j else None, diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index 56eed9c72..4256b7b5d 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -18,15 +18,15 @@ def rc(*a, **kw): ('fourier', 1), ('casa', -1) ]) -@pytest.mark.parametrize("cfg_rime_parallel", [ +@pytest.mark.parametrize("cfg_parallel", [ ("africanus.rime.phase", {"rime.phase_delay.parallel": True}), ("africanus.rime.phase", {"rime.phase_delay.parallel": {"threads": 2}}), ("africanus.rime.phase", {"rime.phase_delay.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) -def test_phase_delay(convention, sign, cfg_rime_parallel): +def test_phase_delay(convention, sign, cfg_parallel): from africanus.rime.phase import phase_delay - assert phase_delay.targetoptions['parallel'] == cfg_rime_parallel + assert phase_delay.targetoptions['parallel'] == cfg_parallel uvw = np.random.random(size=(100, 3)) lm = np.random.random(size=(10, 2)) @@ -55,14 +55,14 @@ def test_phase_delay(convention, sign, cfg_rime_parallel): assert np.all(np.exp(1j*phase) == complex_phase[lm_i, uvw_i, freq_i]) -@pytest.mark.parametrize("cfg_rime_parallel", [ +@pytest.mark.parametrize("cfg_parallel", [ ("africanus.rime.feeds", {"rime.feed_rotation.parallel": True}), ("africanus.rime.feeds", {"rime.feed_rotation.parallel": False}), ], ids=["parallel", "serial"], indirect=True) -def test_feed_rotation(cfg_rime_parallel): +def test_feed_rotation(cfg_parallel): from africanus.rime.feeds import feed_rotation - assert feed_rotation.targetoptions['parallel'] == cfg_rime_parallel + assert feed_rotation.targetoptions['parallel'] == cfg_parallel parangles = np.random.random((10, 5)) pa_sin = np.sin(parangles) From a1f6f6d868ab859b228846af4d7b2cdba85624d0 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 09:45:42 +0200 Subject: [PATCH 18/26] flake8 --- africanus/conftest.py | 1 - africanus/model/spectral/spec_model.py | 15 +++++++++------ .../model/spectral/tests/test_spectral_model.py | 7 ++++--- africanus/rime/fast_beam_cubes.py | 10 ++++++---- africanus/rime/feeds.py | 1 - africanus/rime/phase.py | 1 + africanus/rime/predict.py | 4 ++-- africanus/rime/tests/test_fast_beams.py | 6 ++++-- africanus/rime/tests/test_predict.py | 2 -- africanus/rime/tests/test_rime.py | 1 + 10 files changed, 27 insertions(+), 21 deletions(-) diff --git a/africanus/conftest.py b/africanus/conftest.py index a69770a9c..a1af6abc5 100644 --- a/africanus/conftest.py +++ b/africanus/conftest.py @@ -33,7 +33,6 @@ def cfg_parallel(request): importlib.reload(mod) - # content of conftest.py def pytest_configure(config): mark_in_pytest(True) diff --git a/africanus/model/spectral/spec_model.py b/africanus/model/spectral/spec_model.py index 1af8c9aec..796a6c45e 100644 --- a/africanus/model/spectral/spec_model.py +++ b/africanus/model/spectral/spec_model.py @@ -13,6 +13,7 @@ parallel = cfg.get('parallel', False) axes = cfg.get('axes', ('source', 'chan') if parallel else ()) + def numpy_spectral_model(stokes, spi, ref_freq, frequency, base): out_shape = (stokes.shape[0], frequency.shape[0]) + stokes.shape[1:] @@ -146,6 +147,8 @@ def is_log10(base): from numba import prange, set_num_threads, get_num_threads threads = cfg.get('threads', None) if parallel else None + srange = prange if 'source' in axes else range + crange = prange if 'chan' in axes else range def impl(stokes, spi, ref_freq, frequency, base=0): if parallel and threads is not None: @@ -174,10 +177,10 @@ def impl(stokes, spi, ref_freq, frequency, base=0): # The output cache patterns could be improved. for p, b in enumerate(list_base[:npol]): if is_std(b): - for s in range(nsrc): + for s in srange(nsrc): rf = ref_freq[s] - for f in range(nchan): + for f in crange(nchan): freq_ratio = (frequency[f] / rf) - 1.0 spec_model = estokes[s, p] @@ -188,10 +191,10 @@ def impl(stokes, spi, ref_freq, frequency, base=0): spectral_model[s, f, p] = spec_model elif is_log(b): - for s in range(nsrc): + for s in srange(nsrc): rf = ref_freq[s] - for f in range(nchan): + for f in crange(nchan): freq_ratio = np.log(frequency[f] / rf) spec_model = np.log(estokes[s, p]) @@ -202,10 +205,10 @@ def impl(stokes, spi, ref_freq, frequency, base=0): spectral_model[s, f, p] = np.exp(spec_model) elif is_log10(b): - for s in range(nsrc): + for s in srange(nsrc): rf = ref_freq[s] - for f in range(nchan): + for f in crange(nchan): freq_ratio = np.log10(frequency[f] / rf) spec_model = np.log10(estokes[s, p]) diff --git a/africanus/model/spectral/tests/test_spectral_model.py b/africanus/model/spectral/tests/test_spectral_model.py index 5b192c348..7dd1ff621 100644 --- a/africanus/model/spectral/tests/test_spectral_model.py +++ b/africanus/model/spectral/tests/test_spectral_model.py @@ -43,8 +43,10 @@ def impl(shape): @pytest.mark.parametrize("npol", [0, 1, 2, 4]) @pytest.mark.parametrize("cfg_parallel", [ ("africanus.model.spectral.spec_model", {"model.spectral.parallel": True}), - ("africanus.model.spectral.spec_model", {"model.spectral.parallel": {'threads': 2}}), - ("africanus.model.spectral.spec_model", {"model.spectral.parallel": False}), + ("africanus.model.spectral.spec_model", { + "model.spectral.parallel": {'threads': 2}}), + ("africanus.model.spectral.spec_model", + {"model.spectral.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_spectral_model_multiple_spi(flux, ref_freq, frequency, base, npol, cfg_parallel): @@ -54,7 +56,6 @@ def test_spectral_model_multiple_spi(flux, ref_freq, frequency, assert spectral_model.targetoptions['parallel'] == cfg_parallel - nsrc = 10 nchan = 16 nspi = 6 diff --git a/africanus/rime/fast_beam_cubes.py b/africanus/rime/fast_beam_cubes.py index 4fa59a051..a4095e1ae 100644 --- a/africanus/rime/fast_beam_cubes.py +++ b/africanus/rime/fast_beam_cubes.py @@ -10,7 +10,7 @@ cfg = config.numba_parallel("rime.beam_cube_dde.parallel") parallel = cfg.get('parallel', False) -axes = cfg.get("axes", set(('source','row')) if parallel else set()) +axes = cfg.get("axes", set(('source', 'row')) if parallel else set()) @njit(nogil=True, cache=True) @@ -73,8 +73,8 @@ def beam_cube_dde(beam, beam_lm_extents, beam_freq_map, threads = cfg.get('threads', None) if parallel else None def impl(beam, beam_lm_extents, beam_freq_map, - lm, parallactic_angles, point_errors, antenna_scaling, - frequency): + lm, parallactic_angles, point_errors, antenna_scaling, + frequency): if parallel and threads is not None: prev_threads = get_num_threads() @@ -114,7 +114,8 @@ def impl(beam, beam_lm_extents, beam_freq_map, fbeam = beam.reshape((beam_lw, beam_mh, beam_nud, ncorrs)) # Allocate output array with correlations flattened - fjones = np.empty((nsrc, ntime, nants, nchan, ncorrs), dtype=beam.dtype) + fjones = np.empty( + (nsrc, ntime, nants, nchan, ncorrs), dtype=beam.dtype) # Compute frequency interpolation stuff freq_data = freq_grid_interp(frequency, beam_freq_map) @@ -260,6 +261,7 @@ def impl(beam, beam_lm_extents, beam_freq_map, return impl + BEAM_CUBE_DOCS = DocstringTemplate( r""" Evaluates Direction Dependent Effects along a source's path diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 4ef1d17f0..08889c45c 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -29,7 +29,6 @@ def impl(parallactic_angles, feed_type='linear'): # We know parangles.ndim == 1 though result = np.zeros((parangles.shape[0], 2, 2), dtype=dtype) - # Linear feeds if feed_type == 'linear': for i in numba.prange(parangles.shape[0]): diff --git a/africanus/rime/phase.py b/africanus/rime/phase.py index 44d25edcf..6868ff1d9 100644 --- a/africanus/rime/phase.py +++ b/africanus/rime/phase.py @@ -12,6 +12,7 @@ parallel = cfg.get('parallel', False) axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) + @generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def phase_delay(lm, uvw, frequency, convention='fourier'): # Bake constants in with the correct type diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index eafc03856..35ecd4d12 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -9,7 +9,7 @@ cfg = config.numba_parallel("rime.predict_vis.parallel") parallel = cfg.get('parallel', False) -axes = cfg.get("axes", set(('source','row')) if parallel else set()) +axes = cfg.get("axes", set(('source', 'row')) if parallel else set()) JONES_NOT_PRESENT = 0 JONES_1_OR_2 = 1 @@ -487,7 +487,7 @@ def predict_vis(time_index, antenna1, antenna2, apply_dies_fn = apply_dies_factory(have_dies, have_bvis, jones_type) add_coh_fn = add_coh_factory(have_bvis) - from numba import prange, set_num_threads, get_num_threads + from numba import set_num_threads, get_num_threads threads = cfg.get("threads", None) if parallel else None def _predict_vis_fn(time_index, antenna1, antenna2, diff --git a/africanus/rime/tests/test_fast_beams.py b/africanus/rime/tests/test_fast_beams.py index 2bc6d54e1..722842a79 100644 --- a/africanus/rime/tests/test_fast_beams.py +++ b/africanus/rime/tests/test_fast_beams.py @@ -36,9 +36,11 @@ def freqs(): return np.array([.4, .5, .6, .7, .8, .9, 1.0, 1.1]) + @pytest.mark.parametrize("cfg_parallel", [ ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": True}), - ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": {'threads': 2}}), + ("africanus.rime.fast_beam_cubes", { + "rime.beam_cube_dde.parallel": {'threads': 2}}), ("africanus.rime.fast_beam_cubes", {"rime.beam_cube_dde.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_fast_beam_small(cfg_parallel): @@ -48,7 +50,6 @@ def test_fast_beam_small(cfg_parallel): np.random.seed(42) - # One frequency, to the lower side of the beam frequency map freq = np.asarray([.3]) beam_freq_map = np.asarray([0.0, 1.0]) @@ -223,6 +224,7 @@ def test_fast_beams_vs_montblanc(freqs, beam_freq_map_montblanc, dtype): """ Test that the numba beam matches montblanc implementation """ mb_tf_mod = pytest.importorskip("montblanc.impl.rime.tensorflow") tf = pytest.importorskip("tensorflow") + from africanus.rime.fast_beam_cubes import beam_cube_dde freqs = freqs.astype(dtype) beam_freq_map = beam_freq_map_montblanc.astype(dtype) diff --git a/africanus/rime/tests/test_predict.py b/africanus/rime/tests/test_predict.py index 52e56512d..8a880e026 100644 --- a/africanus/rime/tests/test_predict.py +++ b/africanus/rime/tests/test_predict.py @@ -3,8 +3,6 @@ """Tests for `codex-africanus` package.""" -import importlib - import numpy as np from numpy.testing import assert_array_almost_equal diff --git a/africanus/rime/tests/test_rime.py b/africanus/rime/tests/test_rime.py index 4256b7b5d..73dbe6073 100644 --- a/africanus/rime/tests/test_rime.py +++ b/africanus/rime/tests/test_rime.py @@ -6,6 +6,7 @@ import numpy as np import pytest + def rf(*a, **kw): return np.random.random(*a, **kw) From 62a3b3c809d5af3c6eef7d74eb441e64998b878f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 12:06:02 +0200 Subject: [PATCH 19/26] swizzling --- africanus/model/spectral/spec_model.py | 35 ++++++++++++------- .../spectral/tests/test_spectral_model.py | 7 ++-- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/africanus/model/spectral/spec_model.py b/africanus/model/spectral/spec_model.py index 796a6c45e..ca33eabcd 100644 --- a/africanus/model/spectral/spec_model.py +++ b/africanus/model/spectral/spec_model.py @@ -9,9 +9,9 @@ from africanus.util.docs import DocstringTemplate -cfg = config.numba_parallel('model.spectral.parallel') +cfg = config.numba_parallel('model.spectral_model.parallel') parallel = cfg.get('parallel', False) -axes = cfg.get('axes', ('source', 'chan') if parallel else ()) +axes = cfg.get('axes', set(('source', )) if parallel else ()) def numpy_spectral_model(stokes, spi, ref_freq, frequency, base): @@ -62,9 +62,11 @@ def numpy_spectral_model(stokes, spi, ref_freq, frequency, base): def pol_getter_factory(npoldims): if npoldims == 0: + @njit def impl(pol_shape): return 1 else: + @njit def impl(pol_shape): npols = 1 @@ -73,30 +75,33 @@ def impl(pol_shape): return npols - return njit(nogil=True, cache=True)(impl) + return impl def promote_base_factory(is_base_list): if is_base_list: + @njit def impl(base, npol): return base + [base[-1]] * (npol - len(base)) else: + @njit def impl(base, npol): return [base] * npol - return njit(nogil=True, cache=True)(impl) + return impl def add_pol_dim_factory(have_pol_dim): if have_pol_dim: + @njit def impl(array): return array else: + @njit def impl(array): return array.reshape(array.shape + (1,)) - return njit(nogil=True, cache=True)(impl) - + return impl @generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def spectral_model(stokes, spi, ref_freq, frequency, base=0): @@ -113,31 +118,33 @@ def spectral_model(stokes, spi, ref_freq, frequency, base=0): promote_base = promote_base_factory(is_base_list) if isinstance(base, types.scalars.Integer): + @njit def is_std(base): return base == 0 + @njit def is_log(base): return base == 1 + @njit def is_log10(base): return base == 2 elif isinstance(base, types.misc.UnicodeType): + @njit def is_std(base): return base == "std" + @njit def is_log(base): return base == "log" + @njit def is_log10(base): return base == "log10" else: raise TypeError("base '%s' should be a string or integer" % base) - is_std = njit(nogil=True, cache=True)(is_std) - is_log = njit(nogil=True, cache=True)(is_log) - is_log10 = njit(nogil=True, cache=True)(is_log10) - npoldims = stokes.ndim - 1 pol_get_fn = pol_getter_factory(npoldims) add_pol_dim = add_pol_dim_factory(npoldims > 0) @@ -147,8 +154,9 @@ def is_log10(base): from numba import prange, set_num_threads, get_num_threads threads = cfg.get('threads', None) if parallel else None - srange = prange if 'source' in axes else range - crange = prange if 'chan' in axes else range + srange = prange if parallel and 'source' in axes else range + crange = prange if parallel and 'chan' in axes else range + print("source", srange, "chan", crange) def impl(stokes, spi, ref_freq, frequency, base=0): if parallel and threads is not None: @@ -175,7 +183,8 @@ def impl(stokes, spi, ref_freq, frequency, base=0): # TODO(sjperkins) # Polarisation + associated base on the outer loop # The output cache patterns could be improved. - for p, b in enumerate(list_base[:npol]): + for p in range(npol): + b = list_base[p] if is_std(b): for s in srange(nsrc): rf = ref_freq[s] diff --git a/africanus/model/spectral/tests/test_spectral_model.py b/africanus/model/spectral/tests/test_spectral_model.py index 7dd1ff621..8acb10ff9 100644 --- a/africanus/model/spectral/tests/test_spectral_model.py +++ b/africanus/model/spectral/tests/test_spectral_model.py @@ -42,11 +42,12 @@ def impl(shape): ["log", "std", "std", "std"]]) @pytest.mark.parametrize("npol", [0, 1, 2, 4]) @pytest.mark.parametrize("cfg_parallel", [ - ("africanus.model.spectral.spec_model", {"model.spectral.parallel": True}), + ("africanus.model.spectral.spec_model", + {"model.spectral_model.parallel": True}), ("africanus.model.spectral.spec_model", { - "model.spectral.parallel": {'threads': 2}}), + "model.spectral_model.parallel": {'threads': 2}}), ("africanus.model.spectral.spec_model", - {"model.spectral.parallel": False}), + {"model.spectral_model.parallel": False}), ], ids=["parallel", "parallel-2", "serial"], indirect=True) def test_spectral_model_multiple_spi(flux, ref_freq, frequency, base, npol, cfg_parallel): From 850967335ba5eb5da07976208c53d7dfd7079c5f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 12:33:01 +0200 Subject: [PATCH 20/26] remove RIME conftest cfg_parallel --- africanus/rime/tests/conftest.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/africanus/rime/tests/conftest.py b/africanus/rime/tests/conftest.py index f44647d3a..b56c7821f 100644 --- a/africanus/rime/tests/conftest.py +++ b/africanus/rime/tests/conftest.py @@ -9,33 +9,6 @@ import pytest -@pytest.fixture -def cfg_rime_parallel(request): - """ Performs parallel configuration setting and module reloading """ - from africanus.config import config - - module, cfg = request.param - - assert isinstance(cfg, dict) and len(cfg) == 1 - - # Get module object, because importlib.reload doesn't take strings - mod = importlib.import_module(module) - - with config.set(cfg): - importlib.reload(mod) - - cfg = cfg.copy().popitem()[1] - - if isinstance(cfg, dict): - yield cfg['parallel'] - elif isinstance(cfg, bool): - yield cfg - else: - raise TypeError("Unhandled cfg type %s" % type(cfg)) - - importlib.reload(mod) - - @pytest.fixture def wsrt_ants(): """ Westerbork antenna positions """ From ac91656b6748c41774e39099699eb995fabcba53 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 12:35:38 +0200 Subject: [PATCH 21/26] Gaussian shapes --- africanus/model/shape/gaussian_shape.py | 15 ++++++++++++--- .../model/shape/tests/test_gaussian_shape.py | 17 +++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/africanus/model/shape/gaussian_shape.py b/africanus/model/shape/gaussian_shape.py index 8a5b09050..3b2c3343d 100644 --- a/africanus/model/shape/gaussian_shape.py +++ b/africanus/model/shape/gaussian_shape.py @@ -3,12 +3,17 @@ import numpy as np +from africanus.config import config from africanus.util.docs import DocstringTemplate from africanus.util.numba import generated_jit from africanus.constants import c as lightspeed +cfg = config.numba_parallel('model.shape.gaussian.parallel') +parallel = cfg.get('parallel', False) +axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=True) + +@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) def gaussian(uvw, frequency, shape_params): # https://en.wikipedia.org/wiki/Full_width_at_half_maximum fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) @@ -18,6 +23,10 @@ def gaussian(uvw, frequency, shape_params): dtype = np.result_type(*(np.dtype(a.dtype.name) for a in (uvw, frequency, shape_params))) + from numba import prange, set_num_threads, get_num_threads + srange = prange if parallel and 'source' in axes else range + rrange = prange if parallel and 'row' in axes else range + def impl(uvw, frequency, shape_params): nsrc = shape_params.shape[0] nrow = uvw.shape[0] @@ -30,7 +39,7 @@ def impl(uvw, frequency, shape_params): for f in range(frequency.shape[0]): scaled_freq[f] = frequency[f] * gauss_scale - for s in range(shape_params.shape[0]): + for s in srange(shape_params.shape[0]): emaj, emin, angle = shape_params[s] # Convert to l-projection, m-projection, ratio @@ -38,7 +47,7 @@ def impl(uvw, frequency, shape_params): em = emaj * np.cos(angle) er = emin / (1.0 if emaj == 0.0 else emaj) - for r in range(uvw.shape[0]): + for r in rrange(uvw.shape[0]): u, v, w = uvw[r] u1 = (u*em - v*el)*er diff --git a/africanus/model/shape/tests/test_gaussian_shape.py b/africanus/model/shape/tests/test_gaussian_shape.py index a3e7d0271..5fdd02aeb 100644 --- a/africanus/model/shape/tests/test_gaussian_shape.py +++ b/africanus/model/shape/tests/test_gaussian_shape.py @@ -5,13 +5,21 @@ from numpy.testing import assert_array_almost_equal import pytest -from africanus.model.shape import gaussian as np_gaussian - - -def test_gauss_shape(): +@pytest.mark.parametrize("cfg_parallel", [ + ("africanus.model.shape.gaussian_shape", + {"model.shape.gaussian.parallel": True}), + ("africanus.model.shape.gaussian_shape", { + "model.shape.gaussian.parallel": {'threads': 2}}), + ("africanus.model.shape.gaussian_shape", + {"model.shape.gaussian.parallel": False}), + ], ids=["parallel", "parallel-2", "serial"], indirect=True) +def test_gauss_shape(cfg_parallel): + from africanus.model.shape.gaussian_shape import gaussian as np_gaussian row = 10 chan = 16 + assert np_gaussian.targetoptions['parallel'] == cfg_parallel + shape_params = np.array([[.4, .3, .2], [.4, .3, .2]]) uvw = np.random.random((row, 3)) @@ -24,6 +32,7 @@ def test_gauss_shape(): def test_dask_gauss_shape(): da = pytest.importorskip('dask.array') + from africanus.model.shape import gaussian as np_gaussian from africanus.model.shape.dask import gaussian as da_gaussian row_chunks = (5, 5) From be3daf62786a39e9ef2b3026293004bc4b22a7c3 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 15:19:30 +0200 Subject: [PATCH 22/26] Changes --- africanus/model/shape/gaussian_shape.py | 2 +- africanus/model/spectral/spec_model.py | 3 +-- africanus/rime/fast_beam_cubes.py | 15 +++++++++------ africanus/rime/feeds.py | 2 +- africanus/rime/phase.py | 2 +- africanus/rime/predict.py | 2 +- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/africanus/model/shape/gaussian_shape.py b/africanus/model/shape/gaussian_shape.py index 3b2c3343d..479d52def 100644 --- a/africanus/model/shape/gaussian_shape.py +++ b/africanus/model/shape/gaussian_shape.py @@ -13,7 +13,7 @@ axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) def gaussian(uvw, frequency, shape_params): # https://en.wikipedia.org/wiki/Full_width_at_half_maximum fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) diff --git a/africanus/model/spectral/spec_model.py b/africanus/model/spectral/spec_model.py index ca33eabcd..d0fb69ac9 100644 --- a/africanus/model/spectral/spec_model.py +++ b/africanus/model/spectral/spec_model.py @@ -103,7 +103,7 @@ def impl(array): return impl -@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) def spectral_model(stokes, spi, ref_freq, frequency, base=0): arg_dtypes = tuple(np.dtype(a.dtype.name) for a in (stokes, spi, ref_freq, frequency)) @@ -156,7 +156,6 @@ def is_log10(base): threads = cfg.get('threads', None) if parallel else None srange = prange if parallel and 'source' in axes else range crange = prange if parallel and 'chan' in axes else range - print("source", srange, "chan", crange) def impl(stokes, spi, ref_freq, frequency, base=0): if parallel and threads is not None: diff --git a/africanus/rime/fast_beam_cubes.py b/africanus/rime/fast_beam_cubes.py index a4095e1ae..fb0e62af6 100644 --- a/africanus/rime/fast_beam_cubes.py +++ b/africanus/rime/fast_beam_cubes.py @@ -61,15 +61,15 @@ def freq_grid_interp(frequency, beam_freq_map): return freq_data -@generated_jit(nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=not parallel, nogil=True, cache=not parallel, parallel=parallel) def beam_cube_dde(beam, beam_lm_extents, beam_freq_map, lm, parallactic_angles, point_errors, antenna_scaling, frequency): - from numba import prange, set_num_threads, get_num_threads + from numba import prange, set_num_threads, get_num_threads, literal_unroll - srange = prange if 'source' in axes else range - rrange = prange if 'row' in axes else range + srange = prange if parallel and 'source' in axes else range + rrange = prange if parallel and 'row' in axes else range threads = cfg.get('threads', None) if parallel else None def impl(beam, beam_lm_extents, beam_freq_map, @@ -91,7 +91,10 @@ def impl(beam, beam_lm_extents, beam_freq_map, "must be >= 2") # Flatten correlations - ncorrs = reduce(lambda x, y: x*y, corrs, 1) + ncorrs = 1 + + for c in literal_unroll(corrs): + ncorrs *= c lower_l, upper_l = beam_lm_extents[0] lower_m, upper_m = beam_lm_extents[1] @@ -129,7 +132,7 @@ def impl(beam, beam_lm_extents, beam_freq_map, sin_pa = np.sin(parallactic_angles[t, a]) cos_pa = np.cos(parallactic_angles[t, a]) - for s in srange(nsrc): + for s in range(nsrc): # Extract lm coordinates l, m = lm[s] diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index 08889c45c..bcf50fbaf 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -10,7 +10,7 @@ parallel = cfg.get('parallel', False) -@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): pa_np_dtype = np.dtype(parallactic_angles.dtype.name) dtype = np.result_type(pa_np_dtype, np.complex64) diff --git a/africanus/rime/phase.py b/africanus/rime/phase.py index 6868ff1d9..377e7684e 100644 --- a/africanus/rime/phase.py +++ b/africanus/rime/phase.py @@ -13,7 +13,7 @@ axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) def phase_delay(lm, uvw, frequency, convention='fourier'): # Bake constants in with the correct type one = lm.dtype(1.0) diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index 35ecd4d12..c03b82e01 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -439,7 +439,7 @@ def predict_checks(time_index, antenna1, antenna2, have_dies1, have_bvis, have_dies2) -@generated_jit(nopython=True, nogil=True, cache=True, parallel=parallel) +@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) def predict_vis(time_index, antenna1, antenna2, dde1_jones=None, source_coh=None, dde2_jones=None, die1_jones=None, base_vis=None, die2_jones=None): From 1cdebe60ba03b9fbafa99448dbab1ca78bda9904 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 15:22:44 +0200 Subject: [PATCH 23/26] flake8 --- africanus/model/shape/gaussian_shape.py | 11 ++++++++++- africanus/model/shape/tests/test_gaussian_shape.py | 1 + africanus/model/spectral/spec_model.py | 4 +++- africanus/rime/fast_beam_cubes.py | 6 ++---- africanus/rime/feeds.py | 3 ++- africanus/rime/phase.py | 3 ++- africanus/rime/predict.py | 3 ++- africanus/rime/tests/conftest.py | 2 -- 8 files changed, 22 insertions(+), 11 deletions(-) diff --git a/africanus/model/shape/gaussian_shape.py b/africanus/model/shape/gaussian_shape.py index 479d52def..af8bb254b 100644 --- a/africanus/model/shape/gaussian_shape.py +++ b/africanus/model/shape/gaussian_shape.py @@ -13,7 +13,8 @@ axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) +@generated_jit(nopython=True, nogil=True, + cache=not parallel, parallel=parallel) def gaussian(uvw, frequency, shape_params): # https://en.wikipedia.org/wiki/Full_width_at_half_maximum fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) @@ -26,8 +27,13 @@ def gaussian(uvw, frequency, shape_params): from numba import prange, set_num_threads, get_num_threads srange = prange if parallel and 'source' in axes else range rrange = prange if parallel and 'row' in axes else range + threads = cfg.get("threads", None) if parallel else None def impl(uvw, frequency, shape_params): + if parallel and threads is not None: + prev_threads = get_num_threads() + set_num_threads(threads) + nsrc = shape_params.shape[0] nrow = uvw.shape[0] nchan = frequency.shape[0] @@ -59,6 +65,9 @@ def impl(uvw, frequency, shape_params): shape[s, r, f] = np.exp(-(fu1*fu1 + fv1*fv1)) + if parallel and threads is not None: + set_num_threads(prev_threads) + return shape return impl diff --git a/africanus/model/shape/tests/test_gaussian_shape.py b/africanus/model/shape/tests/test_gaussian_shape.py index 5fdd02aeb..a1a40e2c5 100644 --- a/africanus/model/shape/tests/test_gaussian_shape.py +++ b/africanus/model/shape/tests/test_gaussian_shape.py @@ -5,6 +5,7 @@ from numpy.testing import assert_array_almost_equal import pytest + @pytest.mark.parametrize("cfg_parallel", [ ("africanus.model.shape.gaussian_shape", {"model.shape.gaussian.parallel": True}), diff --git a/africanus/model/spectral/spec_model.py b/africanus/model/spectral/spec_model.py index d0fb69ac9..7f3a91bde 100644 --- a/africanus/model/spectral/spec_model.py +++ b/africanus/model/spectral/spec_model.py @@ -103,7 +103,9 @@ def impl(array): return impl -@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) + +@generated_jit(nopython=True, nogil=True, + cache=not parallel, parallel=parallel) def spectral_model(stokes, spi, ref_freq, frequency, base=0): arg_dtypes = tuple(np.dtype(a.dtype.name) for a in (stokes, spi, ref_freq, frequency)) diff --git a/africanus/rime/fast_beam_cubes.py b/africanus/rime/fast_beam_cubes.py index fb0e62af6..9d887bf1c 100644 --- a/africanus/rime/fast_beam_cubes.py +++ b/africanus/rime/fast_beam_cubes.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -from functools import reduce - import numpy as np from africanus.config import config @@ -61,14 +59,14 @@ def freq_grid_interp(frequency, beam_freq_map): return freq_data -@generated_jit(nopython=not parallel, nogil=True, cache=not parallel, parallel=parallel) +@generated_jit(nopython=not parallel, nogil=True, + cache=not parallel, parallel=parallel) def beam_cube_dde(beam, beam_lm_extents, beam_freq_map, lm, parallactic_angles, point_errors, antenna_scaling, frequency): from numba import prange, set_num_threads, get_num_threads, literal_unroll - srange = prange if parallel and 'source' in axes else range rrange = prange if parallel and 'row' in axes else range threads = cfg.get('threads', None) if parallel else None diff --git a/africanus/rime/feeds.py b/africanus/rime/feeds.py index bcf50fbaf..833e96b05 100644 --- a/africanus/rime/feeds.py +++ b/africanus/rime/feeds.py @@ -10,7 +10,8 @@ parallel = cfg.get('parallel', False) -@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) +@generated_jit(nopython=True, nogil=True, + cache=not parallel, parallel=parallel) def feed_rotation(parallactic_angles, feed_type='linear'): pa_np_dtype = np.dtype(parallactic_angles.dtype.name) dtype = np.result_type(pa_np_dtype, np.complex64) diff --git a/africanus/rime/phase.py b/africanus/rime/phase.py index 377e7684e..9eee72fa4 100644 --- a/africanus/rime/phase.py +++ b/africanus/rime/phase.py @@ -13,7 +13,8 @@ axes = cfg.get('axes', set(('source', 'row')) if parallel else ()) -@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) +@generated_jit(nopython=True, nogil=True, + cache=not parallel, parallel=parallel) def phase_delay(lm, uvw, frequency, convention='fourier'): # Bake constants in with the correct type one = lm.dtype(1.0) diff --git a/africanus/rime/predict.py b/africanus/rime/predict.py index c03b82e01..8369eaa53 100644 --- a/africanus/rime/predict.py +++ b/africanus/rime/predict.py @@ -439,7 +439,8 @@ def predict_checks(time_index, antenna1, antenna2, have_dies1, have_bvis, have_dies2) -@generated_jit(nopython=True, nogil=True, cache=not parallel, parallel=parallel) +@generated_jit(nopython=True, nogil=True, + cache=not parallel, parallel=parallel) def predict_vis(time_index, antenna1, antenna2, dde1_jones=None, source_coh=None, dde2_jones=None, die1_jones=None, base_vis=None, die2_jones=None): diff --git a/africanus/rime/tests/conftest.py b/africanus/rime/tests/conftest.py index b56c7821f..83bac2ae5 100644 --- a/africanus/rime/tests/conftest.py +++ b/africanus/rime/tests/conftest.py @@ -3,8 +3,6 @@ """Tests for `codex-africanus` package.""" -import importlib - import numpy as np import pytest From c1ee4496f29fd8f8f9a2756741aa5fd40cac333d Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 15:26:57 +0200 Subject: [PATCH 24/26] [skip ci] Claim the PR --- HISTORY.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.rst b/HISTORY.rst index 40a883edc..0e62209d3 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -4,6 +4,7 @@ History 0.2.3 (YYYY-MM-DD) ------------------ +* Parallel Numba RIME Implementation (:pr:`186`) * MeqTrees Comparison Script Updates (:pr:`160`) * Improve requirements and work around broken pytest-flake8 (:pr:`187`) * Use python-casacore wheels for travis testing, instead of kernsuite packages (:pr:`185`) From 6ebc0c6bfc91ad8d47e1562756d3a158b62426b0 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 13 May 2020 15:42:44 +0200 Subject: [PATCH 25/26] Reintroduce donfig --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c1f9000e4..1d8660894 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,8 @@ # Basic requirements containing no C extensions. # This is necessary for building on RTD requirements = ['appdirs >= 1.4.3', - 'decorator'] + 'decorator', + 'donfig'] if not on_rtd: requirements += [ From 346a84f1b1612876914e5fafde06f0851ef148e5 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 29 May 2020 11:46:03 +0200 Subject: [PATCH 26/26] [skip ci] Fix dodgy merge in HISTORY.rst --- HISTORY.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index 28c0daa82..e20954b4e 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -16,7 +16,6 @@ X.Y.Z (YYYY-MM-DD) * Use github workflows (:pr:`196`, :pr:`197`, :pr:`198`, :pr:`199`) * Make CASA parallactic angles thread-safe (:pr:`195`) * Fix spectral model documentation (:pr:`190`), to match changes in (:pr:`189`) ->>>>>>> master 0.2.3 (2020-05-14) ------------------