Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe blocks endpoints #758

23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
Expand Down
11 changes: 11 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
return entities.BlockStatus_BLOCK_FINALIZED
case flow.BlockStatusSealed:
return entities.BlockStatus_BLOCK_SEALED
default:
return entities.BlockStatus_BLOCK_UNKNOWN
}
}

func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) {
switch encodingVersion {
case flow.EventEncodingVersionCCF:
Expand Down
141 changes: 141 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ package grpc

import (
"context"
"errors"
"fmt"
"io"

"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -1155,3 +1157,142 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromLatestRequest{
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlocksFromClient[Client interface {
Recv() (*access.SubscribeBlocksResponse, error)
}](
ctx context.Context,
client Client,
blocksChan chan<- flow.Block,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next block response
blockResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving block: %w", err))
return
}

block, err := convert.MessageToBlock(blockResponse.GetBlock())
if err != nil {
sendErr(fmt.Errorf("error converting message to block: %w", err))
return
}

select {
case <-ctx.Done():
return
case blocksChan <- block:
}
}
}
174 changes: 174 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,3 +2207,177 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR

return m.responses[m.offset], nil
}

func TestClient_SubscribeBlocks(t *testing.T) {
blocks := test.BlockGenerator()

generateBlockResponses := func(count uint64) []*access.SubscribeBlocksResponse {
var resBlocks []*access.SubscribeBlocksResponse

for i := uint64(0); i < count; i++ {
b, err := convert.BlockToMessage(*blocks.New())
require.NoError(t, err)

resBlocks = append(resBlocks, &access.SubscribeBlocksResponse{
Block: b,
})
}

return resBlocks
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromStartHeight(ctx, startHeight, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].Block.Id)
blockCh, errCh, err := c.SubscribeBlocksFromStartBlockID(ctx, startBlockID, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlocks(t, blockCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockClientStream[SubscribeBlocksResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlocksResponse
}

func (s *mockBlockClientStream[SubscribeBlocksResponse]) Recv() (*SubscribeBlocksResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) {
defer done()
for range blocksCh {
require.FailNow(t, "should not receive blocks")
}
}
Loading