Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe account statuses endpoint #762

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
}
return subsConf
}

func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
38 changes: 38 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/onflow/flow/protobuf/go/flow/executiondata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) {
}, nil
}

func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) {
if m == nil {
return flow.AccountStatus{}, ErrEmptyMessage
}

results, err := MessageToAccountStatusResults(m.GetResults())
if err != nil {
return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err)
}

return flow.AccountStatus{
BlockID: MessageToIdentifier(m.GetBlockId()),
BlockHeight: m.GetBlockHeight(),
MessageIndex: m.GetMessageIndex(),
Results: results,
}, nil
}

func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) {
results := make([]*flow.AccountStatusResult, len(m))
var emptyOptions []jsoncdc.Option

for i, r := range m {
events, err := MessagesToEvents(r.GetEvents(), emptyOptions)
if err != nil {
return nil, fmt.Errorf("error converting events: %w", err)
}

results[i] = &flow.AccountStatusResult{
Address: flow.BytesToAddress(r.GetAddress()),
Events: events,
}
}

return results, nil
}

func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey {
return &entities.AccountKey{
Index: uint32(a.Index),
Expand Down
142 changes: 142 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,3 +1368,145 @@ func receiveBlocksFromClient[Client interface {
}
}
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func receiveAccountStatusesFromStream[Stream interface {
Recv() (*executiondata.SubscribeAccountStatusesResponse, error)
}](
ctx context.Context,
stream Stream,
accountStatutesChan chan<- flow.AccountStatus,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

var nextExpectedMsgIndex uint64
for {
accountStatusResponse, err := stream.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving account status: %w", err))
return
}

accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse)
if err != nil {
sendErr(fmt.Errorf("error converting message to account status: %w", err))
return
}

illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
if accountStatus.MessageIndex != nextExpectedMsgIndex {
sendErr(fmt.Errorf("messages are not ordered"))
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
return
}
nextExpectedMsgIndex = accountStatus.MessageIndex + 1

select {
case <-ctx.Done():
return
case accountStatutesChan <- accountStatus:
}
}
}
Loading
Loading