-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add subscribe blocks endpoints #758
Add subscribe blocks endpoints #758
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #758 +/- ##
==========================================
+ Coverage 52.92% 53.71% +0.79%
==========================================
Files 35 35
Lines 5580 5667 +87
==========================================
+ Hits 2953 3044 +91
+ Misses 2231 2205 -26
- Partials 396 418 +22
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of repeated boilerplate happening here.
How about using a generic method like this
func genericSubscribe[Receive, Response any](
ctx context.Context,
receive func() (Receive, error),
convert func(Receive) (Response, error),
topicNameForErrors string,
) (<-chan Response, <-chan error) {
sub := make(chan Response)
errChan := make(chan error)
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}
go func() {
defer close(sub)
defer close(errChan)
for {
resp, err := receive()
if err != nil {
if err == io.EOF {
return
}
sendErr(fmt.Errorf("error receiving %s: %w", topicNameForErrors, err))
return
}
response, err := convert(resp)
if err != nil {
sendErr(fmt.Errorf("error converting %s: %w", topicNameForErrors, err))
return
}
select {
case <-ctx.Done():
return
case sub <- response:
}
}
}()
return sub, errChan
}
Using it would look like this in the case of subscribeExecutionData
:
func (c *BaseClient) subscribeExecutionData(
ctx context.Context,
req *executiondata.SubscribeExecutionDataRequest,
opts ...grpc.CallOption,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
stream, err := c.executionDataClient.SubscribeExecutionData(ctx, req, opts...)
if err != nil {
return nil, nil, err
}
sub, errChan := genericSubscribe(
ctx,
stream.Recv,
func(resp *executiondata.SubscribeExecutionDataResponse) (flow.ExecutionDataStreamResponse, error) {
execData, err := convert.MessageToBlockExecutionData(resp.GetBlockExecutionData())
if err != nil {
return flow.ExecutionDataStreamResponse{}, fmt.Errorf("error converting execution data for block %d: %w", resp.GetBlockHeight(), err)
}
response := flow.ExecutionDataStreamResponse{
Height: resp.BlockHeight,
ExecutionData: execData,
BlockTimestamp: resp.BlockTimestamp.AsTime(),
}
return response, nil
},
"execution data",
)
return sub, errChan, nil
}
Another question I have is how do we feel about changing the return type to (<-chan flow.ExecutionDataStreamResponse, <-chan error)
from (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)
by simply sending that first error on the error channel. This might be easier to handle.
cc: @peterargue
@janezpodhostnik Good catch! I also noticed it and created a separate issue for this. I want to unify all code duplication at one place after we merge them in |
There's a number of different issues that share similar code/functionality, so we better "deduplicate" it after we merge all of them. |
@illia-malachyn sounds good thanks |
@illia-malachyn can you merge master. |
Part of #746