From a9530b3a94baca3c07000669139808e4df12f684 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 21 Jan 2020 16:11:04 +0100 Subject: [PATCH 01/10] - Add nathost and natport arguments to the Worker's constructor. --- hpbandster/core/worker.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hpbandster/core/worker.py b/hpbandster/core/worker.py index 9e0ffc8..d5b984c 100644 --- a/hpbandster/core/worker.py +++ b/hpbandster/core/worker.py @@ -19,7 +19,8 @@ class Worker(object): The first allows to perform inital computations, e.g. loading the dataset, when the worker is started, while the latter is repeatedly called during the optimization and evaluates a given configuration yielding the associated loss. """ - def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, host=None, id=None, timeout=None): + def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, host=None, nathost=None, + natport=None, id=None, timeout=None): """ Parameters @@ -34,6 +35,10 @@ def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, h logger used for debugging output host: str hostname for this worker process + nathost: str + external hostname for this worker process + natport: int + external port for this worker process id: anything with a __str__method if multiple workers are started in the same process, you MUST provide a unique id for each one of them using the `id` argument. timeout: int or float @@ -44,6 +49,8 @@ def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, h """ self.run_id = run_id self.host = host + self.nathost = nathost + self.natport = natport self.nameserver = nameserver self.nameserver_port = nameserver_port self.worker_id = "hpbandster.run_%s.worker.%s.%i"%(self.run_id, socket.gethostname(), os.getpid()) @@ -149,7 +156,7 @@ def _run(self): self.logger.info('WORKER: start listening for jobs') - self.pyro_daemon = Pyro4.core.Daemon(host=self.host) + self.pyro_daemon = Pyro4.core.Daemon(host=self.host, nathost=self.nathost, natport=self.natport) with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: uri = self.pyro_daemon.register(self, self.worker_id) From 8dce80b2d57a2ba18215fa7521805c0e49d9b380 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 21 Jan 2020 16:15:43 +0100 Subject: [PATCH 02/10] - Add nathost and natport arguments to the NameServer's constructor. --- hpbandster/core/nameserver.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hpbandster/core/nameserver.py b/hpbandster/core/nameserver.py index aa64883..9be7f1f 100644 --- a/hpbandster/core/nameserver.py +++ b/hpbandster/core/nameserver.py @@ -19,7 +19,7 @@ class NameServer(object): can work in parallel and register their results without creating racing conditions. The implementation uses `PYRO4 `_ as a backend and this class is basically a wrapper. """ - def __init__(self, run_id, working_directory=None, host=None, port=0, nic_name=None): + def __init__(self, run_id, working_directory=None, host=None, port=0, nic_name=None, nathost=None, natport=None): """ Parameters ---------- @@ -34,11 +34,17 @@ def __init__(self, run_id, working_directory=None, host=None, port=0, nic_name=N the port to be used. Default (=0) means a random port nic_name: str name of the network interface to use (only used if host is not given) + nathost: str + external hostname for this worker process + natport: int + external port for this worker process """ self.run_id = run_id self.host = host self.nic_name = nic_name self.port = port + self.nathost = nathost + self.natport = natport self.dir = working_directory self.conf_fn = None self.pyro_ns = None @@ -61,7 +67,7 @@ def start(self): else: self.host = nic_name_to_host(self.nic_name) - uri, self.pyro_ns, _ = Pyro4.naming.startNS(host=self.host, port=self.port) + uri, self.pyro_ns, _ = Pyro4.naming.startNS(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) self.host, self.port = self.pyro_ns.locationStr.split(':') self.port = int(self.port) From a2acd125f5630fe39431c33b964ef02d51288e98 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 21 Jan 2020 16:29:02 +0100 Subject: [PATCH 03/10] - Write the nathost and natport in the pickle file, if natLocationString is not None. --- hpbandster/core/nameserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hpbandster/core/nameserver.py b/hpbandster/core/nameserver.py index 9be7f1f..ba8b49c 100644 --- a/hpbandster/core/nameserver.py +++ b/hpbandster/core/nameserver.py @@ -69,7 +69,10 @@ def start(self): uri, self.pyro_ns, _ = Pyro4.naming.startNS(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) - self.host, self.port = self.pyro_ns.locationStr.split(':') + if self.pyro_ns.natLocationStr: + self.host, self.port = self.pyro_ns.natLocationStr.split(':') + else: + self.host, self.port = self.pyro_ns.locationStr.split(':') self.port = int(self.port) thread = threading.Thread(target=self.pyro_ns.requestLoop, name='Pyro4 nameserver started by HpBandSter') From a8ef957661f09e81e375d2a9ccbe28c07e6083df Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 21 Jan 2020 16:40:47 +0100 Subject: [PATCH 04/10] - Add possibility to specify the worker port. --- hpbandster/core/worker.py | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/hpbandster/core/worker.py b/hpbandster/core/worker.py index d5b984c..8e55660 100644 --- a/hpbandster/core/worker.py +++ b/hpbandster/core/worker.py @@ -19,10 +19,10 @@ class Worker(object): The first allows to perform inital computations, e.g. loading the dataset, when the worker is started, while the latter is repeatedly called during the optimization and evaluates a given configuration yielding the associated loss. """ - def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, host=None, nathost=None, + def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, host=None, port=0, nathost=None, natport=None, id=None, timeout=None): """ - + Parameters ---------- run_id: anything with a __str__ method @@ -35,6 +35,8 @@ def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, h logger used for debugging output host: str hostname for this worker process + port: int + port for this worker process nathost: str external hostname for this worker process natport: int @@ -49,16 +51,17 @@ def __init__(self, run_id, nameserver=None, nameserver_port=None, logger=None, h """ self.run_id = run_id self.host = host + self.port = port self.nathost = nathost self.natport = natport self.nameserver = nameserver self.nameserver_port = nameserver_port self.worker_id = "hpbandster.run_%s.worker.%s.%i"%(self.run_id, socket.gethostname(), os.getpid()) - + self.timeout = timeout self.timer = None - - + + if not id is None: self.worker_id +='.%s'%str(id) @@ -88,7 +91,7 @@ def load_nameserver_credentials(self, working_directory, num_tries=60, interval= waiting period between the attempts """ fn = os.path.join(working_directory, 'HPB_run_%s_pyro.pkl'%self.run_id) - + for i in range(num_tries): try: with open(fn, 'rb') as fh: @@ -105,7 +108,7 @@ def load_nameserver_credentials(self, working_directory, num_tries=60, interval= def run(self, background=False): """ Method to start the worker. - + Parameters ---------- background: bool @@ -124,7 +127,7 @@ def run(self, background=False): def _run(self): # initial ping to the dispatcher to register the worker - + try: with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: self.logger.debug('WORKER: Connected to nameserver %s'%(str(ns))) @@ -137,8 +140,8 @@ def _run(self): exit(1) except: raise - - + + for dn, uri in dispatchers.items(): try: self.logger.debug('WORKER: found dispatcher %s'%dn) @@ -156,22 +159,22 @@ def _run(self): self.logger.info('WORKER: start listening for jobs') - self.pyro_daemon = Pyro4.core.Daemon(host=self.host, nathost=self.nathost, natport=self.natport) + self.pyro_daemon = Pyro4.core.Daemon(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: uri = self.pyro_daemon.register(self, self.worker_id) ns.register(self.worker_id, uri) - + self.pyro_daemon.requestLoop() with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: ns.remove(self.worker_id) - - + + def compute(self, config_id, config, budget, working_directory): """ The function you have to overload implementing your computation. - + Parameters ---------- config_id: tuple @@ -193,7 +196,7 @@ def compute(self, config_id, config, budget, working_directory): - 'loss': a numerical value that is MINIMIZED - 'info': This can be pretty much any build in python type, e.g. a dict with lists as value. Due to Pyro4 handling the remote function calls, 3rd party types like numpy arrays are not supported! """ - + raise NotImplementedError("Subclass hpbandster.distributed.worker and overwrite the compute method in your worker script") @Pyro4.expose @@ -228,10 +231,10 @@ def start_computation(self, callback, id, *args, **kwargs): self.timer.start() return(result) - @Pyro4.expose + @Pyro4.expose def is_busy(self): return(self.busy) - + @Pyro4.expose @Pyro4.oneway def shutdown(self): From 274bfa9f00f232d132cc103c7450ea3c3ce66885 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 21 Jan 2020 16:59:28 +0100 Subject: [PATCH 05/10] - Add possibility to specify the port, nathost and natport to master and dispatcher. --- hpbandster/core/dispatcher.py | 15 ++++++++++++--- hpbandster/core/master.py | 14 +++++++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/hpbandster/core/dispatcher.py b/hpbandster/core/dispatcher.py index e9d8b48..dd61588 100644 --- a/hpbandster/core/dispatcher.py +++ b/hpbandster/core/dispatcher.py @@ -73,8 +73,8 @@ class Dispatcher(object): """ def __init__(self, new_result_callback, run_id='0', ping_interval=10, nameserver='localhost', - nameserver_port=None, - host=None, logger=None, queue_callback=None): + nameserver_port=None, host=None, port=0, nathost=None, + natport=None, logger=None, queue_callback=None): """ Parameters ---------- @@ -91,6 +91,12 @@ def __init__(self, new_result_callback, run_id='0', port of Pyro4 nameserver host: str ip (or name that resolves to that) of the network interface to use + port: int + port for this dispatcher process + nathost: str + external hostname for this dispatcher process + natport: int + external port for this dispatcher process logger: logging.Logger logger-instance for info and debug queue_callback: function @@ -103,6 +109,9 @@ def __init__(self, new_result_callback, run_id='0', self.nameserver = nameserver self.nameserver_port = nameserver_port self.host = host + self.port = port + self.nathost = nathost + self.natport = natport self.ping_interval = int(ping_interval) self.shutdown_all_threads = False @@ -136,7 +145,7 @@ def run(self): self.logger.info('DISPATCHER: started the \'job_runner\' thread') - self.pyro_daemon = Pyro4.core.Daemon(host=self.host) + self.pyro_daemon = Pyro4.core.Daemon(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: uri = self.pyro_daemon.register(self, self.pyro_id) diff --git a/hpbandster/core/master.py b/hpbandster/core/master.py index e9703b1..0241f8d 100644 --- a/hpbandster/core/master.py +++ b/hpbandster/core/master.py @@ -25,6 +25,9 @@ def __init__(self, nameserver='127.0.0.1', nameserver_port=None, host=None, + port=None, + nathost=None, + natport=None, shutdown_workers=True, job_queue_sizes=(-1,0), dynamic_queue_size=True, @@ -64,6 +67,12 @@ def __init__(self, port of Pyro4 nameserver host: str ip (or name that resolves to that) of the network interface to use + port: int + port for this master process + nathost: str + external hostname for this master process + natport: int + external port for this master process shutdown_workers: bool flag to control whether the workers are shutdown after the computation is done job_queue_size: tuple of ints @@ -120,7 +129,10 @@ def __init__(self, 'time_ref' : self.time_ref } - self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, ping_interval=ping_interval, nameserver=nameserver, nameserver_port=nameserver_port, host=host) + self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, + ping_interval=ping_interval, nameserver=nameserver, + nameserver_port=nameserver_port, host=host, port=port, nathost=nathost, + natport=natport) self.dispatcher_thread = threading.Thread(target=self.dispatcher.run) self.dispatcher_thread.start() From bbe43050a81f559e93d6e03741bd80316fb5bb96 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Wed, 22 Jan 2020 09:34:49 +0100 Subject: [PATCH 06/10] Enhance comments. --- hpbandster/core/master.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hpbandster/core/master.py b/hpbandster/core/master.py index 0241f8d..fb08fae 100644 --- a/hpbandster/core/master.py +++ b/hpbandster/core/master.py @@ -68,11 +68,11 @@ def __init__(self, host: str ip (or name that resolves to that) of the network interface to use port: int - port for this master process + port for the job dispatcher process nathost: str - external hostname for this master process + external hostname for the job dispatcher process natport: int - external port for this master process + external port for the job dispatcher process shutdown_workers: bool flag to control whether the workers are shutdown after the computation is done job_queue_size: tuple of ints From f5775069bbe0cfc3980adeb4f3723dc48e2316f4 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Fri, 24 Jan 2020 13:40:21 +0100 Subject: [PATCH 07/10] Add flag to choose whether results from died workers should be registered. --- hpbandster/core/dispatcher.py | 674 +++++++++++++++++----------------- 1 file changed, 338 insertions(+), 336 deletions(-) diff --git a/hpbandster/core/dispatcher.py b/hpbandster/core/dispatcher.py index dd61588..0dc35cd 100644 --- a/hpbandster/core/dispatcher.py +++ b/hpbandster/core/dispatcher.py @@ -7,351 +7,353 @@ class Job(object): - def __init__(self, id, **kwargs): - self.id = id - - self.kwargs = kwargs - - self.timestamps = {} + def __init__(self, id, **kwargs): + self.id = id - self.result = None - self.exception = None + self.kwargs = kwargs - self.worker_name = None + self.timestamps = {} - def time_it(self, which_time): - self.timestamps[which_time] = time.time() + self.result = None + self.exception = None - def __repr__(self): - return(\ - "job_id: " +str(self.id) + "\n" + \ - "kwargs: " + str(self.kwargs) + "\n" + \ - "result: " + str(self.result)+ "\n" +\ - "exception: "+ str(self.exception) + "\n" - ) - def recreate_from_run(self, run): - - run.config_id - run.budget - run.error_logs - run.loss - run.info - run.time_stamps + self.worker_name = None + def time_it(self, which_time): + self.timestamps[which_time] = time.time() + def __repr__(self): + return ( \ + "job_id: " + str(self.id) + "\n" + \ + "kwargs: " + str(self.kwargs) + "\n" + \ + "result: " + str(self.result) + "\n" + \ + "exception: " + str(self.exception) + "\n" + ) + + def recreate_from_run(self, run): + run.config_id + run.budget + run.error_logs + run.loss + run.info + run.time_stamps class Worker(object): - def __init__(self, name, uri): - self.name = name - self.proxy = Pyro4.Proxy(uri) - self.runs_job = None - - def is_alive(self): - try: - self.proxy._pyroReconnect(1) - except Pyro4.errors.ConnectionClosedError: - return False - except: - raise - return(True) - - def shutdown(self): - self.proxy.shutdown() - - def is_busy(self): - return(self.proxy.is_busy()) - - def __repr__(self): - return(self.name) + def __init__(self, name, uri): + self.name = name + self.proxy = Pyro4.Proxy(uri) + self.runs_job = None + + def is_alive(self): + try: + self.proxy._pyroReconnect(1) + except Pyro4.errors.ConnectionClosedError: + return False + except: + raise + return (True) + + def shutdown(self): + self.proxy.shutdown() + + def is_busy(self): + return (self.proxy.is_busy()) + + def __repr__(self): + return (self.name) class Dispatcher(object): - """ - The dispatcher is responsible for assigning tasks to free workers, report results back to the master and - communicate to the nameserver. - """ - def __init__(self, new_result_callback, run_id='0', - ping_interval=10, nameserver='localhost', - nameserver_port=None, host=None, port=0, nathost=None, - natport=None, logger=None, queue_callback=None): - """ - Parameters - ---------- - new_result_callback: function - function that will be called with a `Job instance `_ as argument. - From the `Job` the result can be read and e.g. logged. - run_id: str - unique run_id associated with the HPB run - ping_interval: int - how often to ping for workers (in seconds) - nameserver: str - address of the Pyro4 nameserver - nameserver_port: int - port of Pyro4 nameserver - host: str - ip (or name that resolves to that) of the network interface to use - port: int - port for this dispatcher process - nathost: str - external hostname for this dispatcher process - natport: int - external port for this dispatcher process - logger: logging.Logger - logger-instance for info and debug - queue_callback: function - gets called with the number of workers in the pool on every update-cycle - """ - - self.new_result_callback = new_result_callback - self.queue_callback = queue_callback - self.run_id = run_id - self.nameserver = nameserver - self.nameserver_port = nameserver_port - self.host = host - self.port = port - self.nathost = nathost - self.natport = natport - self.ping_interval = int(ping_interval) - self.shutdown_all_threads = False - - - if logger is None: - self.logger = logging.getLogger('hpbandster') - else: - self.logger = logger - - self.worker_pool = {} - - self.waiting_jobs = queue.Queue() - self.running_jobs = {} - self.idle_workers = set() - - - self.thread_lock = threading.Lock() - self.runner_cond = threading.Condition(self.thread_lock) - self.discover_cond = threading.Condition(self.thread_lock) - - self.pyro_id="hpbandster.run_%s.dispatcher"%self.run_id - - - def run(self): - with self.discover_cond: - t1 = threading.Thread(target=self.discover_workers, name='discover_workers') - t1.start() - self.logger.info('DISPATCHER: started the \'discover_worker\' thread') - t2 = threading.Thread(target=self.job_runner, name='job_runner') - t2.start() - self.logger.info('DISPATCHER: started the \'job_runner\' thread') - - - self.pyro_daemon = Pyro4.core.Daemon(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) - - with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: - uri = self.pyro_daemon.register(self, self.pyro_id) - ns.register(self.pyro_id, uri) - - self.logger.info("DISPATCHER: Pyro daemon running on %s"%(self.pyro_daemon.locationStr)) - - - self.pyro_daemon.requestLoop() - - - with self.discover_cond: - self.shutdown_all_threads = True - self.logger.info('DISPATCHER: Dispatcher shutting down') - - self.runner_cond.notify_all() - self.discover_cond.notify_all() - - - - with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: - ns.remove(self.pyro_id) - - t1.join() - self.logger.debug('DISPATCHER: \'discover_worker\' thread exited') - t2.join() - self.logger.debug('DISPATCHER: \'job_runner\' thread exited') - self.logger.info('DISPATCHER: shut down complete') - - def shutdown_all_workers(self, rediscover=False): - with self.discover_cond: - for worker in self.worker_pool.values(): - worker.shutdown() - if rediscover: - time.sleep(1) - self.discover_cond.notify() - - def shutdown(self, shutdown_workers=False): - if shutdown_workers: - self.shutdown_all_workers() - - with self.runner_cond: - self.pyro_daemon.shutdown() - - @Pyro4.expose - @Pyro4.oneway - def trigger_discover_worker(self): - #time.sleep(1) - self.logger.info("DISPATCHER: A new worker triggered discover_worker") - with self.discover_cond: - self.discover_cond.notify() - - - def discover_workers(self): - self.discover_cond.acquire() - sleep_interval = 1 - - while True: - self.logger.debug('DISPATCHER: Starting worker discovery') - update = False - - with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: - worker_names = ns.list(prefix="hpbandster.run_%s.worker."%self.run_id) - self.logger.debug("DISPATCHER: Found %i potential workers, %i currently in the pool."%(len(worker_names), len(self.worker_pool))) - - for wn, uri in worker_names.items(): - if not wn in self.worker_pool: - w = Worker(wn, uri) - if not w.is_alive(): - self.logger.debug('DISPATCHER: skipping dead worker, %s'%wn) - continue - update = True - self.logger.info('DISPATCHER: discovered new worker, %s'%wn) - self.worker_pool[wn] = w - - # check the current list of workers - crashed_jobs = set() - - all_workers = list(self.worker_pool.keys()) - for wn in all_workers: - # remove dead entries from the nameserver - if not self.worker_pool[wn].is_alive(): - self.logger.info('DISPATCHER: removing dead worker, %s'%wn) - update = True - # todo check if there were jobs running on that that need to be rescheduled - - current_job = self.worker_pool[wn].runs_job - - if not current_job is None: - self.logger.info('Job %s was not completed'%str(current_job)) - crashed_jobs.add(current_job) - - del self.worker_pool[wn] - self.idle_workers.discard(wn) - continue - - if not self.worker_pool[wn].is_busy(): - self.idle_workers.add(wn) - - - # try to submit more jobs if something changed - if update: - if not self.queue_callback is None: - self.discover_cond.release() - self.queue_callback(len(self.worker_pool)) - self.discover_cond.acquire() - self.runner_cond.notify() - - for crashed_job in crashed_jobs: - self.discover_cond.release() - self.register_result(crashed_job, {'result': None, 'exception': 'Worker died unexpectedly.'}) - self.discover_cond.acquire() - - self.logger.debug('DISPATCHER: Finished worker discovery') - - #if (len(self.worker_pool) == 0 ): # ping for new workers if no workers are currently available - # self.logger.debug('No workers available! Keep pinging') - # self.discover_cond.wait(sleep_interval) - # sleep_interval *= 2 - #else: - self.discover_cond.wait(self.ping_interval) - - if self.shutdown_all_threads: - self.logger.debug('DISPATCHER: discover_workers shutting down') - self.runner_cond.notify() - self.discover_cond.release() - return - - def number_of_workers(self): - with self.discover_cond: - return(len(self.worker_pool)) - - def job_runner(self): - - self.runner_cond.acquire() - while True: - - while self.waiting_jobs.empty() or len(self.idle_workers) == 0: - self.logger.debug('DISPATCHER: jobs to submit = %i, number of idle workers = %i -> waiting!'%(self.waiting_jobs.qsize(), len(self.idle_workers) )) - self.runner_cond.wait() - self.logger.debug('DISPATCHER: Trying to submit another job.') - if self.shutdown_all_threads: - self.logger.debug('DISPATCHER: job_runner shutting down') - self.discover_cond.notify() - self.runner_cond.release() - return - - job = self.waiting_jobs.get() - wn = self.idle_workers.pop() - - worker = self.worker_pool[wn] - self.logger.debug('DISPATCHER: starting job %s on %s'%(str(job.id),worker.name)) - - job.time_it('started') - worker.runs_job = job.id - - worker.proxy.start_computation(self, job.id, **job.kwargs) - - job.worker_name = wn - self.running_jobs[job.id] = job - - self.logger.debug('DISPATCHER: job %s dispatched on %s'%(str(job.id),worker.name)) - - - def submit_job(self, id, **kwargs): - self.logger.debug('DISPATCHER: trying to submit job %s'%str(id)) - with self.runner_cond: - job = Job(id, **kwargs) - job.time_it('submitted') - self.waiting_jobs.put(job) - self.logger.debug('DISPATCHER: trying to notify the job_runner thread.') - self.runner_cond.notify() - - @Pyro4.expose - @Pyro4.callback - @Pyro4.oneway - def register_result(self, id=None, result=None): - self.logger.debug('DISPATCHER: job %s finished'%(str(id))) - with self.runner_cond: - self.logger.debug('DISPATCHER: register_result: lock acquired') - # fill in missing information - job = self.running_jobs[id] - job.time_it('finished') - job.result = result['result'] - job.exception = result['exception'] - - self.logger.debug('DISPATCHER: job %s on %s finished'%(str(job.id),job.worker_name)) - self.logger.debug(str(job)) - - # delete job - del self.running_jobs[id] - - # label worker as idle again - try: - self.worker_pool[job.worker_name].runs_job = None - self.worker_pool[job.worker_name].proxy._pyroRelease() - self.idle_workers.add(job.worker_name) - # notify the job_runner to check for more jobs to run - self.runner_cond.notify() - except KeyError: - # happens for crashed workers, but we can just continue - pass - except: - raise - - # call users callback function to register the result - # needs to be with the condition released, as the master can call - # submit_job quickly enough to cause a dead-lock - self.new_result_callback(job) + """ + The dispatcher is responsible for assigning tasks to free workers, report results back to the master and + communicate to the nameserver. + """ + + def __init__(self, new_result_callback, run_id='0', + ping_interval=10, nameserver='localhost', + nameserver_port=None, host=None, port=0, nathost=None, + natport=None, logger=None, queue_callback=None, + register_results_from_died_workers=True): + """ + Parameters + ---------- + new_result_callback: function + function that will be called with a `Job instance `_ as argument. + From the `Job` the result can be read and e.g. logged. + run_id: str + unique run_id associated with the HPB run + ping_interval: int + how often to ping for workers (in seconds) + nameserver: str + address of the Pyro4 nameserver + nameserver_port: int + port of Pyro4 nameserver + host: str + ip (or name that resolves to that) of the network interface to use + port: int + port for this dispatcher process + nathost: str + external hostname for this dispatcher process + natport: int + external port for this dispatcher process + logger: logging.Logger + logger-instance for info and debug + queue_callback: function + gets called with the number of workers in the pool on every update-cycle + register_results_from_died_workers: bool + register results from died workers. If false, then the job processed by the died worker will be submitted to + next idle worker. + """ + + self.new_result_callback = new_result_callback + self.queue_callback = queue_callback + self.run_id = run_id + self.nameserver = nameserver + self.nameserver_port = nameserver_port + self.host = host + self.port = port + self.nathost = nathost + self.natport = natport + self.ping_interval = int(ping_interval) + self.shutdown_all_threads = False + + if logger is None: + self.logger = logging.getLogger('hpbandster') + else: + self.logger = logger + + self.worker_pool = {} + + self.register_results_from_died_workers = register_results_from_died_workers + self.id_to_job_dict = dict() + self.waiting_jobs = queue.Queue() + self.running_jobs = {} + self.idle_workers = set() + + self.thread_lock = threading.Lock() + self.runner_cond = threading.Condition(self.thread_lock) + self.discover_cond = threading.Condition(self.thread_lock) + + self.pyro_id = "hpbandster.run_%s.dispatcher" % self.run_id + + def run(self): + with self.discover_cond: + t1 = threading.Thread(target=self.discover_workers, name='discover_workers') + t1.start() + self.logger.info('DISPATCHER: started the \'discover_worker\' thread') + t2 = threading.Thread(target=self.job_runner, name='job_runner') + t2.start() + self.logger.info('DISPATCHER: started the \'job_runner\' thread') + + self.pyro_daemon = Pyro4.core.Daemon(host=self.host, port=self.port, nathost=self.nathost, natport=self.natport) + + with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: + uri = self.pyro_daemon.register(self, self.pyro_id) + ns.register(self.pyro_id, uri) + + self.logger.info("DISPATCHER: Pyro daemon running on %s" % (self.pyro_daemon.locationStr)) + + self.pyro_daemon.requestLoop() + + with self.discover_cond: + self.shutdown_all_threads = True + self.logger.info('DISPATCHER: Dispatcher shutting down') + + self.runner_cond.notify_all() + self.discover_cond.notify_all() + + with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: + ns.remove(self.pyro_id) + + t1.join() + self.logger.debug('DISPATCHER: \'discover_worker\' thread exited') + t2.join() + self.logger.debug('DISPATCHER: \'job_runner\' thread exited') + self.logger.info('DISPATCHER: shut down complete') + + def shutdown_all_workers(self, rediscover=False): + with self.discover_cond: + for worker in self.worker_pool.values(): + worker.shutdown() + if rediscover: + time.sleep(1) + self.discover_cond.notify() + + def shutdown(self, shutdown_workers=False): + if shutdown_workers: + self.shutdown_all_workers() + + with self.runner_cond: + self.pyro_daemon.shutdown() + + @Pyro4.expose + @Pyro4.oneway + def trigger_discover_worker(self): + # time.sleep(1) + self.logger.info("DISPATCHER: A new worker triggered discover_worker") + with self.discover_cond: + self.discover_cond.notify() + + def discover_workers(self): + self.discover_cond.acquire() + sleep_interval = 1 + + while True: + self.logger.debug('DISPATCHER: Starting worker discovery') + update = False + + with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: + worker_names = ns.list(prefix="hpbandster.run_%s.worker." % self.run_id) + self.logger.debug("DISPATCHER: Found %i potential workers, %i currently in the pool." % ( + len(worker_names), len(self.worker_pool))) + + for wn, uri in worker_names.items(): + if not wn in self.worker_pool: + w = Worker(wn, uri) + if not w.is_alive(): + self.logger.debug('DISPATCHER: skipping dead worker, %s' % wn) + continue + update = True + self.logger.info('DISPATCHER: discovered new worker, %s' % wn) + self.worker_pool[wn] = w + + # check the current list of workers + crashed_jobs = set() + + all_workers = list(self.worker_pool.keys()) + for wn in all_workers: + # remove dead entries from the nameserver + if not self.worker_pool[wn].is_alive(): + self.logger.info('DISPATCHER: removing dead worker, %s' % wn) + update = True + # todo check if there were jobs running on that that need to be rescheduled + + current_job = self.worker_pool[wn].runs_job + + if not current_job is None: + self.logger.info('Job %s was not completed' % str(current_job)) + crashed_jobs.add(current_job) + + del self.worker_pool[wn] + self.idle_workers.discard(wn) + continue + + if not self.worker_pool[wn].is_busy(): + self.idle_workers.add(wn) + + # try to submit more jobs if something changed + if update: + if not self.queue_callback is None: + self.discover_cond.release() + self.queue_callback(len(self.worker_pool)) + self.discover_cond.acquire() + self.runner_cond.notify() + + for crashed_job in crashed_jobs: + self.discover_cond.release() + if self.register_results_from_died_workers: + self.register_result(crashed_job, {'result': None, 'exception': 'Worker died unexpectedly.'}) + else: + job = self.id_to_job_dict[crashed_job] + self.submit_job(job.id, **job.kwargs) + self.discover_cond.acquire() + + self.logger.debug('DISPATCHER: Finished worker discovery') + + # if (len(self.worker_pool) == 0 ): # ping for new workers if no workers are currently available + # self.logger.debug('No workers available! Keep pinging') + # self.discover_cond.wait(sleep_interval) + # sleep_interval *= 2 + # else: + self.discover_cond.wait(self.ping_interval) + + if self.shutdown_all_threads: + self.logger.debug('DISPATCHER: discover_workers shutting down') + self.runner_cond.notify() + self.discover_cond.release() + return + + def number_of_workers(self): + with self.discover_cond: + return (len(self.worker_pool)) + + def job_runner(self): + + self.runner_cond.acquire() + while True: + + while self.waiting_jobs.empty() or len(self.idle_workers) == 0: + self.logger.debug('DISPATCHER: jobs to submit = %i, number of idle workers = %i -> waiting!' % ( + self.waiting_jobs.qsize(), len(self.idle_workers))) + self.runner_cond.wait() + self.logger.debug('DISPATCHER: Trying to submit another job.') + if self.shutdown_all_threads: + self.logger.debug('DISPATCHER: job_runner shutting down') + self.discover_cond.notify() + self.runner_cond.release() + return + + job = self.waiting_jobs.get() + wn = self.idle_workers.pop() + + worker = self.worker_pool[wn] + self.logger.debug('DISPATCHER: starting job %s on %s' % (str(job.id), worker.name)) + + job.time_it('started') + worker.runs_job = job.id + + worker.proxy.start_computation(self, job.id, **job.kwargs) + + job.worker_name = wn + self.running_jobs[job.id] = job + + self.logger.debug('DISPATCHER: job %s dispatched on %s' % (str(job.id), worker.name)) + + def submit_job(self, id, **kwargs): + self.logger.debug('DISPATCHER: trying to submit job %s' % str(id)) + with self.runner_cond: + job = Job(id, **kwargs) + job.time_it('submitted') + self.waiting_jobs.put(job) + if not self.register_results_from_died_workers: + self.id_to_job_dict[job.id] = job + self.logger.debug('DISPATCHER: trying to notify the job_runner thread.') + self.runner_cond.notify() + + @Pyro4.expose + @Pyro4.callback + @Pyro4.oneway + def register_result(self, id=None, result=None): + self.logger.debug('DISPATCHER: job %s finished' % (str(id))) + with self.runner_cond: + self.logger.debug('DISPATCHER: register_result: lock acquired') + # fill in missing information + job = self.running_jobs[id] + job.time_it('finished') + job.result = result['result'] + job.exception = result['exception'] + + self.logger.debug('DISPATCHER: job %s on %s finished' % (str(job.id), job.worker_name)) + self.logger.debug(str(job)) + + # delete job + del self.running_jobs[id] + + # label worker as idle again + try: + self.worker_pool[job.worker_name].runs_job = None + self.worker_pool[job.worker_name].proxy._pyroRelease() + self.idle_workers.add(job.worker_name) + # notify the job_runner to check for more jobs to run + self.runner_cond.notify() + except KeyError: + # happens for crashed workers, but we can just continue + pass + except: + raise + + # call users callback function to register the result + # needs to be with the condition released, as the master can call + # submit_job quickly enough to cause a dead-lock + self.new_result_callback(job) From 07cde007cc7a0bf0e1ceb33695d0c271b55b7ace Mon Sep 17 00:00:00 2001 From: gchlebus Date: Fri, 24 Jan 2020 13:51:34 +0100 Subject: [PATCH 08/10] Forward register_results_from_died_workers to Master. --- hpbandster/core/master.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hpbandster/core/master.py b/hpbandster/core/master.py index fb08fae..fa1ad90 100644 --- a/hpbandster/core/master.py +++ b/hpbandster/core/master.py @@ -34,6 +34,7 @@ def __init__(self, logger=None, result_logger=None, previous_result = None, + register_results_from_died_workers=True ): """The Master class is responsible for the book keeping and to decide what to run next. Optimizers are instantiations of Master, that handle the important steps of deciding what configurations to run on what @@ -87,6 +88,9 @@ def __init__(self, a result logger that writes live results to disk previous_result: hpbandster.core.result.Result object previous run to warmstart the run + register_results_from_died_workers: bool + register results from died workers. If false, then the job processed by the died worker will be submitted to + next idle worker. """ self.working_directory = working_directory @@ -132,7 +136,7 @@ def __init__(self, self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, ping_interval=ping_interval, nameserver=nameserver, nameserver_port=nameserver_port, host=host, port=port, nathost=nathost, - natport=natport) + natport=natport, register_results_from_died_workers=register_results_from_died_workers) self.dispatcher_thread = threading.Thread(target=self.dispatcher.run) self.dispatcher_thread.start() From c343bd4976e4a64297463b75121db1a2f9472936 Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 28 Jan 2020 09:34:46 +0100 Subject: [PATCH 09/10] Ignore ValueError `spearmanr` needs at least 2 variables to compare. --- hpbandster/visualization.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hpbandster/visualization.py b/hpbandster/visualization.py index f51fe78..d6bb0cc 100644 --- a/hpbandster/visualization.py +++ b/hpbandster/visualization.py @@ -170,9 +170,14 @@ def correlation_across_budgets(results_object, show=False): for i in range(len(budgets)-1): for j in range(i+1,len(budgets)): - spr = sps.spearmanr(loss_pairs[budgets[i]][budgets[j]]) - rhos[i][j-1] = spr.correlation - ps[i][j-1] = spr.pvalue + if loss_pairs[budgets[i]][budgets[j]]: + try: + spr = sps.spearmanr(loss_pairs[budgets[i]][budgets[j]]) + rhos[i][j-1] = spr.correlation + ps[i][j-1] = spr.pvalue + except ValueError: + # ignore `spearmanr` needs at least 2 variables to compare + pass fig, ax = plt.subplots() From 99f24ab49dcc94561f156a84aa931bcab58dd8eb Mon Sep 17 00:00:00 2001 From: gchlebus Date: Tue, 28 Jan 2020 09:35:03 +0100 Subject: [PATCH 10/10] Remove if statement. --- hpbandster/visualization.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/hpbandster/visualization.py b/hpbandster/visualization.py index d6bb0cc..c3f2a39 100644 --- a/hpbandster/visualization.py +++ b/hpbandster/visualization.py @@ -170,14 +170,13 @@ def correlation_across_budgets(results_object, show=False): for i in range(len(budgets)-1): for j in range(i+1,len(budgets)): - if loss_pairs[budgets[i]][budgets[j]]: - try: - spr = sps.spearmanr(loss_pairs[budgets[i]][budgets[j]]) - rhos[i][j-1] = spr.correlation - ps[i][j-1] = spr.pvalue - except ValueError: - # ignore `spearmanr` needs at least 2 variables to compare - pass + try: + spr = sps.spearmanr(loss_pairs[budgets[i]][budgets[j]]) + rhos[i][j-1] = spr.correlation + ps[i][j-1] = spr.pvalue + except ValueError: + # ignore `spearmanr` needs at least 2 variables to compare + pass fig, ax = plt.subplots()