Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols and generic non-unit base classes for signal transformers. #71

Draft
wants to merge 12 commits into
base: dev
Choose a base branch
from

Conversation

cboulay
Copy link
Contributor

@cboulay cboulay commented Jan 5, 2025

See #70 for motivation.

So far in this draft PR, I added protocols and base classes for non-ezmsg-unit transformers as well as the minimal ez.Unit wrapper around the transformer. These are intended to replace the generator methods and GenAxisArray-child units that we have currently.

I also refactored Downsample to this new design. All of its tests pass.

There's at least one imperfection here that I don't know how to solve. For example, in Downsample, I need to provide DownsampleSettings both as a generic type in the class definition as well as a class variable: SETTINGS = DownsampleSettings. I don't know a way around this because ez.Unit's init requires the concrete type. This is a minor gripe that I can live with.

I'd love some feedback on this before I start converting all the other sigproc nodes. @griffinmilsap @pperanich

Also, I kept the downsample method for backwards compatibility though it isn't strictly a generator method anymore.

@cboulay
Copy link
Contributor Author

cboulay commented Jan 6, 2025

In the 3rd most-recent commit, I added a WIP protocol for adaptive transformers with a partial_fit method. I think these kinds of transformers would better fit in a new ezmsg-learn module, but I'm including them here (for now) because we probably want the protocol and base classes for ezmsg-sigproc to be extensible to something like ezmsg-learn. BTW I have a few such transformers that need a little cleaning and standardization then I can create ezmsg-learn; indeed that is why I'm working on this issue now and it has become semi-urgent.

In the 2nd most-recent commit, I added a WIP protocol for transformers where the core processing logic should really be asynchronous. I can't think of any in ezmsg-sigproc, but this could be desirable for transformers that do store-and-passthrough (loggers, some sinks), or transformers that require retrieving data from disk/network/etc. I think this pattern will be more useful on sources and sinks rather than transformers.

@cboulay
Copy link
Contributor Author

cboulay commented Jan 6, 2025

I don't think I put this in a doctstring, but one of my requirements is that I can use these transformers in stateless (cloud) frameworks. e.g., in Bytewax this would look like:

flow = Dataflow("my_flow")
inp = op.input("inp", flow, TestingSource(src_items))
keyed_inp = op.key_on("key", inp, lambda msg: str(msg["user_id"]))
some_keyed_result = op.map_value("pick_amount", keyed_inp, lambda msg: msg["txn_amount"])
downsamp_result = op.stateful_map(
    "downsample_step",
    some_keyed_result,
    DownsampleTransformer(axis="time", target_rate=200.0).stateful_op
)
# ...

Bytewax will take care of passing (state, input) to stateful_op and storing the updated state.
I'm pretty sure that Bytewax wants the state to be a dict, so I will have to modify my stateful_op to create the transformer state from the input dict, and likewise convert the updated state to a dict before returning. I also don't know if Bytewax can handle passing AxisArray objects between its nodes so I might have to do some serialization there too. Still some testing to do there.

Flink will probably require a separate submodule because it needs its own classes to wrap the operator.

I have 0 familiarity with Beam but that looks like it will require custom classes too.

I'm starting with Bytewax because it has the friendliest API for Python users.

@cboulay
Copy link
Contributor Author

cboulay commented Jan 7, 2025

In the most recent 2 commits, within the base class I use typing.get_args(self.__orig_bases__[0]) to get the concrete class's types. This eliminates the need for the state_type and transformer_type class variables that I had before.

I still can't eliminate (e.g.) SETTINGS = DownsampleSettings because of how ez.Unit looks for that class.

@cboulay cboulay force-pushed the 70-use-protocols-for-axisarray-transformers branch from 3e0ed20 to 8205137 Compare January 7, 2025 21:26
@cboulay
Copy link
Contributor Author

cboulay commented Jan 7, 2025

2nd most-recent commit enabled using *args, **kwargs in transformer constructor instead of providing Settings instance.

Most recent commit aded StatefulProcessor base above SignalTransformer for processors that don't return a value in their main .process method. Use it with sinks or with processors whose output is independent of an input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant