diff --git a/access/client.go b/access/client.go index fe0220675..60c671884 100644 --- a/access/client.go +++ b/access/client.go @@ -115,10 +115,10 @@ type Client interface { GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionData, error) // SubscribeExecutionDataByBlockID subscribes to execution data updates starting at the given block ID. - SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) + SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) // SubscribeExecutionDataByBlockHeight subscribes to execution data updates starting at the given block height. - SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) + SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) // SubscribeEventsByBlockID subscribes to events starting at the given block ID. SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) diff --git a/access/grpc/client.go b/access/grpc/client.go index c9d308dfb..e479ad6f5 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo func (c *Client) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, -) (<-chan flow.TransactionResult, <-chan error, error) { +) (<-chan *flow.TransactionResult, <-chan error, error) { return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx) } @@ -298,14 +298,14 @@ func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Ide func (c *Client) SubscribeExecutionDataByBlockID( ctx context.Context, startBlockID flow.Identifier, -) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { return c.grpc.SubscribeExecutionDataByBlockID(ctx, startBlockID) } func (c *Client) SubscribeExecutionDataByBlockHeight( ctx context.Context, startHeight uint64, -) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { return c.grpc.SubscribeExecutionDataByBlockHeight(ctx, startHeight) } @@ -333,7 +333,7 @@ func (c *Client) SubscribeBlockDigestsFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, blockStatus flow.BlockStatus, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus) } @@ -341,14 +341,14 @@ func (c *Client) SubscribeBlockDigestsFromStartHeight( ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus) } func (c *Client) SubscribeBlockDigestsFromLatest( ctx context.Context, blockStatus flow.BlockStatus, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus) } @@ -356,7 +356,7 @@ func (c *Client) SubscribeBlocksFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, blockStatus flow.BlockStatus, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus) } @@ -364,14 +364,14 @@ func (c *Client) SubscribeBlocksFromStartHeight( ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus) } func (c *Client) SubscribeBlocksFromLatest( ctx context.Context, blockStatus flow.BlockStatus, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus) } @@ -379,7 +379,7 @@ func (c *Client) SubscribeBlockHeadersFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, blockStatus flow.BlockStatus, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { return c.grpc.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, blockStatus) } @@ -387,14 +387,14 @@ func (c *Client) SubscribeBlockHeadersFromStartHeight( ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { return c.grpc.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) } func (c *Client) SubscribeBlocksHeadersFromLatest( ctx context.Context, blockStatus flow.BlockStatus, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { return c.grpc.SubscribeBlockHeadersFromLatest(ctx, blockStatus) } @@ -418,7 +418,7 @@ func (c *Client) SubscribeAccountStatusesFromStartHeight( ctx context.Context, startBlockHeight uint64, filter flow.AccountStatusFilter, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter) } @@ -426,13 +426,13 @@ func (c *Client) SubscribeAccountStatusesFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, filter flow.AccountStatusFilter, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter) } func (c *Client) SubscribeAccountStatusesFromLatestBlock( ctx context.Context, filter flow.AccountStatusFilter, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter) } diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index 1165c5f9c..5318b9b7a 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -197,7 +197,7 @@ func BlockToMessage(b flow.Block) (*entities.Block, error) { }, nil } -func MessageToBlock(m *entities.Block) (flow.Block, error) { +func MessageToBlock(m *entities.Block) (*flow.Block, error) { var timestamp time.Time var err error @@ -207,7 +207,7 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) { tc, err := MessageToTimeoutCertificate(m.BlockHeader.GetLastViewTc()) if err != nil { - return flow.Block{}, fmt.Errorf("error converting timeout certificate: %w", err) + return nil, fmt.Errorf("error converting timeout certificate: %w", err) } header := &flow.BlockHeader{ @@ -228,22 +228,22 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) { guarantees, err := MessagesToCollectionGuarantees(m.GetCollectionGuarantees()) if err != nil { - return flow.Block{}, fmt.Errorf("error converting collection guarantees: %w", err) + return nil, fmt.Errorf("error converting collection guarantees: %w", err) } seals, err := MessagesToBlockSeals(m.GetBlockSeals()) if err != nil { - return flow.Block{}, fmt.Errorf("error converting block seals: %w", err) + return nil, fmt.Errorf("error converting block seals: %w", err) } executionReceiptsMeta, err := MessageToExecutionReceiptMetaList(m.GetExecutionReceiptMetaList()) if err != nil { - return flow.Block{}, fmt.Errorf("error converting execution receipt meta list: %w", err) + return nil, fmt.Errorf("error converting execution receipt meta list: %w", err) } executionResults, err := MessageToExecutionResults(m.GetExecutionResultList()) if err != nil { - return flow.Block{}, fmt.Errorf("error converting execution results: %w", err) + return nil, fmt.Errorf("error converting execution results: %w", err) } payload := &flow.BlockPayload{ @@ -255,7 +255,7 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) { ProtocolStateID: flow.HashToID(m.GetProtocolStateId()), } - return flow.Block{ + return &flow.Block{ BlockHeader: *header, BlockPayload: *payload, }, nil @@ -333,9 +333,9 @@ func BlockHeaderToMessage(b flow.BlockHeader) (*entities.BlockHeader, error) { }, nil } -func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { +func MessageToBlockHeader(m *entities.BlockHeader) (*flow.BlockHeader, error) { if m == nil { - return flow.BlockHeader{}, ErrEmptyMessage + return nil, ErrEmptyMessage } var timestamp time.Time @@ -346,10 +346,10 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { timeoutCertificate, err := MessageToTimeoutCertificate(m.GetLastViewTc()) if err != nil { - return flow.BlockHeader{}, fmt.Errorf("error converting timeout certificate: %w", err) + return nil, fmt.Errorf("error converting timeout certificate: %w", err) } - return flow.BlockHeader{ + return &flow.BlockHeader{ ID: flow.HashToID(m.GetId()), ParentID: flow.HashToID(m.GetParentId()), Height: m.GetHeight(), @@ -423,12 +423,12 @@ func QuorumCertificateToMessage(qc flow.QuorumCertificate) (*entities.QuorumCert }, nil } -func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { +func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) { if m == nil { - return flow.BlockDigest{}, ErrEmptyMessage + return nil, ErrEmptyMessage } - return flow.BlockDigest{ + return &flow.BlockDigest{ BlockID: flow.BytesToID(m.GetBlockId()), Height: m.GetBlockHeight(), Timestamp: m.GetBlockTimestamp().AsTime(), diff --git a/access/grpc/convert/convert_test.go b/access/grpc/convert/convert_test.go index 2c255a301..7a8f9f9eb 100644 --- a/access/grpc/convert/convert_test.go +++ b/access/grpc/convert/convert_test.go @@ -61,7 +61,7 @@ func TestConvert_Block(t *testing.T) { blockB, err := MessageToBlock(msg) require.NoError(t, err) - assert.Equal(t, *blockA, blockB) + assert.Equal(t, blockA, blockB) t.Run("Without timestamp", func(t *testing.T) { blockA := test.BlockGenerator().New() @@ -87,7 +87,7 @@ func TestConvert_BlockHeader(t *testing.T) { headerB, err := MessageToBlockHeader(msg) require.NoError(t, err) - assert.Equal(t, headerA, headerB) + assert.Equal(t, headerA, *headerB) t.Run("Without timestamp", func(t *testing.T) { headerA := test.BlockHeaderGenerator().New() diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index a033daed1..ad81a66bb 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -245,7 +245,7 @@ func getBlockHeaderResult(res *access.BlockHeaderResponse) (*flow.BlockHeader, e return nil, newMessageToEntityError(entityBlockHeader, err) } header.Status = flow.BlockStatus(res.GetBlockStatus()) - return &header, nil + return header, nil } func (c *BaseClient) GetLatestBlock( @@ -305,7 +305,7 @@ func getBlockResult(res *access.BlockResponse) (*flow.Block, error) { return nil, newMessageToEntityError(entityBlock, err) } block.BlockHeader.Status = flow.BlockStatus(res.GetBlockStatus()) - return &block, nil + return block, nil } func (c *BaseClient) GetCollection( @@ -992,7 +992,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockID( ctx context.Context, startBlockID flow.Identifier, opts ...grpc.CallOption, -) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { req := executiondata.SubscribeExecutionDataRequest{ StartBlockId: startBlockID[:], EventEncodingVersion: c.eventEncoding, @@ -1004,7 +1004,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockHeight( ctx context.Context, startHeight uint64, opts ...grpc.CallOption, -) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { req := executiondata.SubscribeExecutionDataRequest{ StartBlockHeight: startHeight, EventEncodingVersion: c.eventEncoding, @@ -1016,13 +1016,13 @@ func (c *BaseClient) subscribeExecutionData( ctx context.Context, req *executiondata.SubscribeExecutionDataRequest, opts ...grpc.CallOption, -) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { stream, err := c.executionDataClient.SubscribeExecutionData(ctx, req, opts...) if err != nil { return nil, nil, err } - sub := make(chan flow.ExecutionDataStreamResponse) + sub := make(chan *flow.ExecutionDataStreamResponse) errChan := make(chan error) sendErr := func(err error) { @@ -1062,7 +1062,7 @@ func (c *BaseClient) subscribeExecutionData( select { case <-ctx.Done(): return - case sub <- response: + case sub <- &response: } } }() @@ -1173,7 +1173,7 @@ func (c *BaseClient) SubscribeBlocksFromStartBlockID( startBlockID flow.Identifier, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1189,7 +1189,7 @@ func (c *BaseClient) SubscribeBlocksFromStartBlockID( return nil, nil, newRPCError(err) } - convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (*flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } @@ -1201,7 +1201,7 @@ func (c *BaseClient) SubscribeBlocksFromStartHeight( startHeight uint64, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1217,7 +1217,7 @@ func (c *BaseClient) SubscribeBlocksFromStartHeight( return nil, nil, newRPCError(err) } - convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (*flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } @@ -1228,7 +1228,7 @@ func (c *BaseClient) SubscribeBlocksFromLatest( ctx context.Context, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.Block, <-chan error, error) { +) (<-chan *flow.Block, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1243,7 +1243,7 @@ func (c *BaseClient) SubscribeBlocksFromLatest( return nil, nil, newRPCError(err) } - convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (*flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } @@ -1254,7 +1254,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, opts ...grpc.CallOption, -) (<-chan flow.TransactionResult, <-chan error, error) { +) (<-chan *flow.TransactionResult, <-chan error, error) { txMsg, err := convert.TransactionToMessage(tx) if err != nil { return nil, nil, newEntityToMessageError(entityTransaction, err) @@ -1270,7 +1270,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( return nil, nil, newRPCError(err) } - txStatusChan := make(chan flow.TransactionResult) + txStatusChan := make(chan *flow.TransactionResult) errChan := make(chan error) sendErr := func(err error) { @@ -1313,7 +1313,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( select { case <-ctx.Done(): return - case txStatusChan <- txResult: + case txStatusChan <- &txResult: } } }() @@ -1326,7 +1326,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( startBlockID flow.Identifier, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1342,7 +1342,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( return nil, nil, newRPCError(err) } - convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (*flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } @@ -1354,7 +1354,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( startHeight uint64, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1370,7 +1370,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( return nil, nil, newRPCError(err) } - convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (*flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } @@ -1381,7 +1381,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromLatest( ctx context.Context, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockHeader, <-chan error, error) { +) (<-chan *flow.BlockHeader, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1396,7 +1396,7 @@ func (c *BaseClient) SubscribeBlockHeadersFromLatest( return nil, nil, newRPCError(err) } - convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (*flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } @@ -1408,7 +1408,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( startHeight uint64, filter flow.AccountStatusFilter, opts ...grpc.CallOption, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{ StartBlockHeight: startHeight, EventEncodingVersion: c.eventEncoding, @@ -1435,7 +1435,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( startBlockID flow.Identifier, filter flow.AccountStatusFilter, opts ...grpc.CallOption, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{ StartBlockId: startBlockID.Bytes(), EventEncodingVersion: c.eventEncoding, @@ -1461,7 +1461,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( ctx context.Context, filter flow.AccountStatusFilter, opts ...grpc.CallOption, -) (<-chan flow.AccountStatus, <-chan error, error) { +) (<-chan *flow.AccountStatus, <-chan error, error) { request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{ EventEncodingVersion: c.eventEncoding, } @@ -1487,7 +1487,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( startBlockID flow.Identifier, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1503,7 +1503,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( return nil, nil, newRPCError(err) } - convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } @@ -1515,7 +1515,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( startHeight uint64, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1531,7 +1531,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( return nil, nil, newRPCError(err) } - convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } @@ -1542,7 +1542,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( ctx context.Context, blockStatus flow.BlockStatus, opts ...grpc.CallOption, -) (<-chan flow.BlockDigest, <-chan error, error) { +) (<-chan *flow.BlockDigest, <-chan error, error) { status := convert.BlockStatusToEntity(blockStatus) if status == entities.BlockStatus_BLOCK_UNKNOWN { return nil, nil, newRPCError(errors.New("unknown block status")) @@ -1557,7 +1557,7 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( return nil, nil, newRPCError(err) } - convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } @@ -1574,9 +1574,9 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( func subscribe[Response any, ClientResponse any]( ctx context.Context, receive func() (*ClientResponse, error), - convertResponse func(*ClientResponse) (Response, error), -) (<-chan Response, <-chan error, error) { - subChan := make(chan Response) + convertResponse func(*ClientResponse) (*Response, error), +) (<-chan *Response, <-chan error, error) { + subChan := make(chan *Response) errChan := make(chan error) sendErr := func(err error) { @@ -1632,8 +1632,8 @@ func subscribeContinuouslyIndexed[Response IndexedMessage, ClientResponse any]( ctx context.Context, receive func() (*ClientResponse, error), convertResponse func(*ClientResponse) (Response, error), -) (<-chan Response, <-chan error, error) { - subChan := make(chan Response) +) (<-chan *Response, <-chan error, error) { + subChan := make(chan *Response) errChan := make(chan error) sendErr := func(err error) { @@ -1675,7 +1675,7 @@ func subscribeContinuouslyIndexed[Response IndexedMessage, ClientResponse any]( select { case <-ctx.Done(): return - case subChan <- response: + case subChan <- &response: } } }() diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index fc0df4a8b..5690461bb 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2579,7 +2579,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { actualTxResult := <-txResultCh expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) require.NoError(t, err) - require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedTxResult, *actualTxResult) require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) expectedCounter++ @@ -2613,7 +2613,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { actualTxResult := <-txResultCh expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) require.NoError(t, err) - require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedTxResult, *actualTxResult) require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) expectedCounter++ diff --git a/access/http/client.go b/access/http/client.go index 4e25212bd..e309a502d 100644 --- a/access/http/client.go +++ b/access/http/client.go @@ -272,11 +272,11 @@ func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Ide return nil, fmt.Errorf("not implemented") } -func (c *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +func (c *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { return nil, nil, fmt.Errorf("not implemented") } -func (c *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +func (c *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { return nil, nil, fmt.Errorf("not implemented") } diff --git a/access/mocks/Client.go b/access/mocks/Client.go index 7a39bb053..fbf354409 100644 --- a/access/mocks/Client.go +++ b/access/mocks/Client.go @@ -691,15 +691,15 @@ func (_m *Client) SubscribeEventsByBlockID(ctx context.Context, startBlockID flo } // SubscribeExecutionDataByBlockHeight provides a mock function with given fields: ctx, startHeight -func (_m *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +func (_m *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { ret := _m.Called(ctx, startHeight) - var r0 <-chan flow.ExecutionDataStreamResponse - if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan flow.ExecutionDataStreamResponse); ok { + var r0 <-chan *flow.ExecutionDataStreamResponse + if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan *flow.ExecutionDataStreamResponse); ok { r0 = rf(ctx, startHeight) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan flow.ExecutionDataStreamResponse) + r0 = ret.Get(0).(<-chan *flow.ExecutionDataStreamResponse) } } @@ -723,15 +723,15 @@ func (_m *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, start } // SubscribeExecutionDataByBlockID provides a mock function with given fields: ctx, startBlockID -func (_m *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { +func (_m *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) { ret := _m.Called(ctx, startBlockID) - var r0 <-chan flow.ExecutionDataStreamResponse - if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) <-chan flow.ExecutionDataStreamResponse); ok { + var r0 <-chan *flow.ExecutionDataStreamResponse + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) <-chan *flow.ExecutionDataStreamResponse); ok { r0 = rf(ctx, startBlockID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan flow.ExecutionDataStreamResponse) + r0 = ret.Get(0).(<-chan *flow.ExecutionDataStreamResponse) } }