diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index aba3223a9..6b68c4a30 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -167,15 +167,8 @@ def __init__(self, **kwargs): self.control_handlers[msg_type] = getattr(self, msg_type) @gen.coroutine - def dispatch_control(self, msg): + def dispatch_control(self, msg, ident, stream=None): """dispatch control requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Control Message", exc_info=True) - return - self.log.debug("Control received: %s", msg) # Set the parent message for side effects. @@ -215,15 +208,8 @@ def should_handle(self, stream, msg, idents): return True @gen.coroutine - def dispatch_shell(self, stream, msg): + def dispatch_shell(self, msg, ident, stream): """dispatch shell requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return - # Set the parent message for side effects. self.set_parent(idents, msg) self._publish_status('busy') @@ -385,8 +371,35 @@ def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, priority, dispatch, *args): + def should_dispatch_immediately( + self, msg, ident, stream, priority, dispatch + ): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, priority, dispatch, stream=None): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Message", exc_info=True) + return + + new_args = (msg, idents, stream) + + if self.should_dispatch_immediately( + msg, ident, stream, priority, dispatch, stream + ): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( @@ -394,7 +407,7 @@ def schedule_dispatch(self, priority, dispatch, *args): priority, idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -411,8 +424,8 @@ def start(self): self.control_stream.on_recv( partial( self.schedule_dispatch, - CONTROL_PRIORITY, - self.dispatch_control, + priority=CONTROL_PRIORITY, + dispatch=self.dispatch_control, ), copy=False, ) @@ -423,9 +436,9 @@ def start(self): s.on_recv( partial( self.schedule_dispatch, - SHELL_PRIORITY, - self.dispatch_shell, - s, + priority=SHELL_PRIORITY, + dispatch=self.dispatch_shell, + stream=s, ), copy=False, )