Skip to content

Commit

Permalink
Merge pull request #417 from onflow/navid/event-streaming
Browse files Browse the repository at this point in the history
Add support for event streaming API
  • Loading branch information
peterargue authored Dec 20, 2023
2 parents 39e7e65 + 035da59 commit 58fb441
Show file tree
Hide file tree
Showing 18 changed files with 1,873 additions and 9 deletions.
15 changes: 15 additions & 0 deletions access/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ type Client interface {
// GetExecutionResultForBlockID gets the execution results at the block ID.
GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionResult, error)

// GetExecutionDataByBlockID returns execution data for a specific block ID.
GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionData, error)

// SubscribeExecutionDataByBlockID subscribes to execution data updates starting at the given block ID.
SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeExecutionDataByBlockHeight subscribes to execution data updates starting at the given block height.
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeEventsByBlockID subscribes to events starting at the given block ID.
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error)

// SubscribeEventsByBlockHeight subscribes to events starting at the given block height.
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error)

// Close stops the client connection to the access node.
Close() error
}
20 changes: 20 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ func (c *Client) GetExecutionResultForBlockID(ctx context.Context, blockID flow.
return c.grpc.GetExecutionResultForBlockID(ctx, blockID)
}

func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionData, error) {
return c.grpc.GetExecutionDataByBlockID(ctx, blockID)
}

func (c *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockID(ctx, startBlockID)
}

func (c *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockHeight(ctx, startHeight)
}

func (c *Client) SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
250 changes: 250 additions & 0 deletions access/grpc/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,18 @@ func messageToEvent(m *entities.Event, options []jsoncdc.Option) (flow.Event, er
}, nil
}

func messagesToEvents(m []*entities.Event, options []jsoncdc.Option) ([]flow.Event, error) {
events := make([]flow.Event, 0, len(m))
for _, ev := range m {
res, err := messageToEvent(ev, options)
if err != nil {
return nil, fmt.Errorf("convert: %w", err)
}
events = append(events, res)
}
return events, nil
}

func identifierToMessage(i flow.Identifier) []byte {
return i.Bytes()
}
Expand Down Expand Up @@ -552,3 +564,241 @@ func messageToTransactionResult(m *access.TransactionResultResponse, options []j
CollectionID: flow.BytesToID(m.GetCollectionId()),
}, nil
}

func blockExecutionDataToMessage(
execData *flow.ExecutionData,
) (*entities.BlockExecutionData, error) {
chunks := make([]*entities.ChunkExecutionData, len(execData.ChunkExecutionData))
for i, chunk := range execData.ChunkExecutionData {
convertedChunk, err := chunkExecutionDataToMessage(chunk)
if err != nil {
return nil, err
}
chunks[i] = convertedChunk
}

return &entities.BlockExecutionData{
BlockId: identifierToMessage(execData.BlockID),
ChunkExecutionData: chunks,
}, nil
}

func messageToBlockExecutionData(
m *entities.BlockExecutionData,
) (*flow.ExecutionData, error) {
if m == nil {
return nil, errEmptyMessage
}

chunks := make([]*flow.ChunkExecutionData, len(m.ChunkExecutionData))
for i, chunk := range m.GetChunkExecutionData() {
convertedChunk, err := messageToChunkExecutionData(chunk)
if err != nil {
return nil, err
}
chunks[i] = convertedChunk
}

return &flow.ExecutionData{
BlockID: messageToIdentifier(m.GetBlockId()),
ChunkExecutionData: chunks,
}, nil
}

func chunkExecutionDataToMessage(
chunk *flow.ChunkExecutionData,
) (*entities.ChunkExecutionData, error) {

transactions, err := executionDataCollectionToMessage(chunk.Transactions)
if err != nil {
return nil, err
}

var trieUpdate *entities.TrieUpdate
if chunk.TrieUpdate != nil {
trieUpdate, err = trieUpdateToMessage(chunk.TrieUpdate)
if err != nil {
return nil, err
}
}

events := make([]*entities.Event, len(chunk.Events))
for i, ev := range chunk.Events {
res, err := eventToMessage(*ev)
if err != nil {
return nil, err
}

// execution data uses CCF encoding
res.Payload, err = ccf.Encode(ev.Value)
if err != nil {
return nil, fmt.Errorf("ccf convert: %w", err)
}

events[i] = res
}

results := make([]*entities.ExecutionDataTransactionResult, len(chunk.TransactionResults))
for i, res := range chunk.TransactionResults {
result := lightTransactionResultToMessage(res)
results[i] = result
}

return &entities.ChunkExecutionData{
Collection: transactions,
Events: events,
TrieUpdate: trieUpdate,
TransactionResults: results,
}, nil
}

func messageToChunkExecutionData(
m *entities.ChunkExecutionData,
) (*flow.ChunkExecutionData, error) {

transactions, err := messageToExecutionDataCollection(m.GetCollection())
if err != nil {
return nil, err
}

var trieUpdate *flow.TrieUpdate
if m.GetTrieUpdate() != nil {
trieUpdate, err = messageToTrieUpdate(m.GetTrieUpdate())
if err != nil {
return nil, err
}
}

events := make([]*flow.Event, len(m.GetEvents()))
for i, ev := range m.GetEvents() {
res, err := messageToEvent(ev, nil)
if err != nil {
return nil, err
}
events[i] = &res
}

results := make([]*flow.LightTransactionResult, len(m.GetTransactionResults()))
for i, res := range m.GetTransactionResults() {
result := messageToLightTransactionResult(res)
results[i] = &result
}

return &flow.ChunkExecutionData{
Transactions: transactions,
Events: events,
TrieUpdate: trieUpdate,
TransactionResults: results,
}, nil
}

func executionDataCollectionToMessage(
txs []*flow.Transaction,
) (*entities.ExecutionDataCollection, error) {
transactions := make([]*entities.Transaction, len(txs))
for i, tx := range txs {
transaction, err := transactionToMessage(*tx)
if err != nil {
return nil, fmt.Errorf("could not convert transaction %d: %w", i, err)
}
transactions[i] = transaction
}

return &entities.ExecutionDataCollection{
Transactions: transactions,
}, nil
}

func messageToExecutionDataCollection(
m *entities.ExecutionDataCollection,
) ([]*flow.Transaction, error) {
messages := m.GetTransactions()
transactions := make([]*flow.Transaction, len(messages))
for i, message := range messages {
transaction, err := messageToTransaction(message)
if err != nil {
return nil, fmt.Errorf("could not convert transaction %d: %w", i, err)
}
transactions[i] = &transaction
}

if len(transactions) == 0 {
return nil, nil
}

return transactions, nil
}

func trieUpdateToMessage(
update *flow.TrieUpdate,
) (*entities.TrieUpdate, error) {

payloads := make([]*entities.Payload, len(update.Payloads))
for i, payload := range update.Payloads {
keyParts := make([]*entities.KeyPart, len(payload.KeyPart))
for j, keypart := range payload.KeyPart {
keyParts[j] = &entities.KeyPart{
Type: uint32(keypart.Type),
Value: keypart.Value,
}
}
payloads[i] = &entities.Payload{
KeyPart: keyParts,
Value: payload.Value,
}
}

return &entities.TrieUpdate{
RootHash: update.RootHash,
Paths: update.Paths,
Payloads: payloads,
}, nil
}

func messageToTrieUpdate(
m *entities.TrieUpdate,
) (*flow.TrieUpdate, error) {
rootHash := m.GetRootHash()
paths := m.GetPaths()

payloads := make([]*flow.Payload, len(m.Payloads))
for i, payload := range m.GetPayloads() {
keyParts := make([]*flow.KeyPart, len(payload.GetKeyPart()))
for j, keypart := range payload.GetKeyPart() {
keyParts[j] = &flow.KeyPart{
Type: uint16(keypart.GetType()),
Value: keypart.GetValue(),
}
}
payloads[i] = &flow.Payload{
KeyPart: keyParts,
Value: payload.GetValue(),
}
}

return &flow.TrieUpdate{
RootHash: rootHash,
Paths: paths,
Payloads: payloads,
}, nil
}

func lightTransactionResultToMessage(
result *flow.LightTransactionResult,
) *entities.ExecutionDataTransactionResult {
return &entities.ExecutionDataTransactionResult{
TransactionId: identifierToMessage(result.TransactionID),
Failed: result.Failed,
ComputationUsed: result.ComputationUsed,
}
}

func messageToLightTransactionResult(
m *entities.ExecutionDataTransactionResult,
) flow.LightTransactionResult {
return flow.LightTransactionResult{
TransactionID: messageToIdentifier(m.GetTransactionId()),
Failed: m.Failed,
ComputationUsed: m.GetComputationUsed(),
}
}
23 changes: 23 additions & 0 deletions access/grpc/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,26 @@ func TestConvert_TransactionResult(t *testing.T) {

assert.Equal(t, resultA, resultB)
}

func TestConvert_ExecutionData(t *testing.T) {
executionDataA := test.ExecutionDataGenerator().New()

msg, err := blockExecutionDataToMessage(executionDataA)
require.NoError(t, err)

executionDataB, err := messageToBlockExecutionData(msg)
require.NoError(t, err)

assert.Equal(t, executionDataA.BlockID[:], executionDataB.BlockID[:])
require.NotEmpty(t, executionDataA.ChunkExecutionData)

// Force evaluation of type ID, which is cached in type.
// Necessary for equality check below, otherwise the typeID will be empty
for _, chunk := range executionDataB.ChunkExecutionData {
for _, event := range chunk.Events {
_ = event.Value.Type().ID()
}
}

assert.Equal(t, executionDataA, executionDataB)
}
Loading

0 comments on commit 58fb441

Please sign in to comment.