Skip to content

Commit

Permalink
Merge pull request #764 from The-K-R-O-K/subscribe-block-digest-endpo…
Browse files Browse the repository at this point in the history
…ints

Subscribe block digest endpoints
  • Loading branch information
peterargue authored Oct 15, 2024
2 parents 853e844 + aa96690 commit 8b37d47
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 0 deletions.
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
Expand Down
16 changes: 16 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,22 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest {
return flow.BlockDigest{
BlockID: flow.BytesToID(m.GetBlockId()),
Height: m.GetBlockHeight(),
Timestamp: m.GetBlockTimestamp().AsTime(),
}
}

func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse {
return &access.SubscribeBlockDigestsResponse{
BlockId: IdentifierToMessage(blockDigest.BlockID),
BlockHeight: blockDigest.Height,
BlockTimestamp: timestamppb.New(blockDigest.Timestamp),
}
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
Expand Down
135 changes: 135 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,3 +1516,138 @@ func receiveBlockHeadersFromClient[Client interface {
}
}
}

func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockDigestsFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockDigestsFromLatestRequest{
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlockDigestFromClient[Client interface {
Recv() (*access.SubscribeBlockDigestsResponse, error)
}](
ctx context.Context,
client Client,
blockDigestsChan chan<- flow.BlockDigest,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next blockDigest response
blockDigestResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving blockDigest: %w", err))
return
}

blockDigest := convert.MessageToBlockDigest(blockDigestResponse)

select {
case <-ctx.Done():
return
case blockDigestsChan <- blockDigest:
}
}
}
175 changes: 175 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2713,3 +2713,178 @@ func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan
require.FailNow(t, "should not receive block headers")
}
}

func TestClient_SubscribeBlockDigest(t *testing.T) {
blockHeaders := test.BlockHeaderGenerator()

generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse {
var resBlockDigests []*access.SubscribeBlockDigestsResponse

for i := uint64(0); i < count; i++ {
blockHeader := blockHeaders.New()

digest := flow.BlockDigest{
BlockID: blockHeader.ID,
Height: blockHeader.Height,
Timestamp: blockHeader.Timestamp,
}

resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest))
}

return resBlockDigests
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusSealed)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId)
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusSealed)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlockDigests(t, blockDigestsCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
cancel()

require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlockDigestsResponse
}

func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) {
defer done()
for range blockDigestsChan {
require.FailNow(t, "should not receive block digests")
}
}
7 changes: 7 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,10 @@ type BlockSeal struct {
// block produces the same receipt among all verifying nodes
ExecutionReceiptID Identifier
}

// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp
type BlockDigest struct {
BlockID Identifier
Height uint64
Timestamp time.Time
}

0 comments on commit 8b37d47

Please sign in to comment.