-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamz_nodes.py
32 lines (24 loc) · 1.11 KB
/
streamz_nodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
"""Module conaining useful custom streamz nodes"""
from types import MethodType
from logging import Logger
from streamz import Stream, Sink
@Stream.register_api()
class on_exception(Sink): # pylint: disable=invalid-name
"""Monkey patching to generically catch exceptions within a pipeline"""
def __init__(
self, upstream: Stream, exception=Exception, logger: Logger = None, **kwargs
):
super().__init__(upstream, **kwargs)
original_upstream_update_method = upstream.update
def _(_, payload, who=None, metadata=None):
try:
return original_upstream_update_method(payload, who, metadata)
except exception as exc: # pylint: disable=broad-exception-caught
# Pass down the branch started with this stream instead
if logger:
logger.exception("Caught the following exception in %s", upstream)
return self._emit((payload, exc), metadata)
# Bind to upstream
upstream.update = MethodType(_, upstream)
def update(self, x, who=None, metadata=None):
pass # NO-OP