Skip to content

Commit

Permalink
Use channel of pointers to objects for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-malachyn committed Nov 5, 2024
1 parent 5b17d2d commit 68ee152
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 83 deletions.
8 changes: 4 additions & 4 deletions access/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ type Client interface {
GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionData, error)

// SubscribeExecutionDataByBlockID subscribes to execution data updates starting at the given block ID.
SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)
SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeExecutionDataByBlockHeight subscribes to execution data updates starting at the given block height.
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeEventsByBlockID subscribes to events starting at the given block ID.
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan *flow.BlockEvents, <-chan error, error)

// SubscribeEventsByBlockHeight subscribes to events starting at the given block height.
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...SubscribeOption) (<-chan *flow.BlockEvents, <-chan error, error)

// Close stops the client connection to the access node.
Close() error
Expand Down
34 changes: 17 additions & 17 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo
func (c *Client) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
) (<-chan flow.TransactionResult, <-chan error, error) {
) (<-chan *flow.TransactionResult, <-chan error, error) {
return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx)
}

Expand Down Expand Up @@ -298,14 +298,14 @@ func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Ide
func (c *Client) SubscribeExecutionDataByBlockID(
ctx context.Context,
startBlockID flow.Identifier,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockID(ctx, startBlockID)
}

func (c *Client) SubscribeExecutionDataByBlockHeight(
ctx context.Context,
startHeight uint64,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockHeight(ctx, startHeight)
}

Expand All @@ -314,7 +314,7 @@ func (c *Client) SubscribeEventsByBlockID(
startBlockID flow.Identifier,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
) (<-chan *flow.BlockEvents, <-chan error, error) {
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}
Expand All @@ -324,7 +324,7 @@ func (c *Client) SubscribeEventsByBlockHeight(
startHeight uint64,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
) (<-chan *flow.BlockEvents, <-chan error, error) {
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}
Expand All @@ -333,68 +333,68 @@ func (c *Client) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksHeadersFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromLatest(ctx, blockStatus)
}

Expand All @@ -418,21 +418,21 @@ func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
24 changes: 12 additions & 12 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func BlockToMessage(b flow.Block) (*entities.Block, error) {
}, nil
}

func MessageToBlock(m *entities.Block) (flow.Block, error) {
func MessageToBlock(m *entities.Block) (*flow.Block, error) {
var timestamp time.Time
var err error

Expand All @@ -193,7 +193,7 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) {

tc, err := MessageToTimeoutCertificate(m.BlockHeader.GetLastViewTc())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
}

header := &flow.BlockHeader{
Expand All @@ -214,20 +214,20 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) {

guarantees, err := MessagesToCollectionGuarantees(m.GetCollectionGuarantees())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
}

seals, err := MessagesToBlockSeals(m.GetBlockSeals())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
}

payload := &flow.BlockPayload{
CollectionGuarantees: guarantees,
Seals: seals,
}

return flow.Block{
return &flow.Block{
BlockHeader: *header,
BlockPayload: *payload,
}, nil
Expand Down Expand Up @@ -257,9 +257,9 @@ func BlockHeaderToMessage(b flow.BlockHeader) (*entities.BlockHeader, error) {
}, nil
}

func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
func MessageToBlockHeader(m *entities.BlockHeader) (*flow.BlockHeader, error) {
if m == nil {
return flow.BlockHeader{}, ErrEmptyMessage
return &flow.BlockHeader{}, ErrEmptyMessage
}

var timestamp time.Time
Expand All @@ -270,10 +270,10 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {

timeoutCertificate, err := MessageToTimeoutCertificate(m.GetLastViewTc())
if err != nil {
return flow.BlockHeader{}, fmt.Errorf("error converting timeout certificate: %w", err)
return &flow.BlockHeader{}, fmt.Errorf("error converting timeout certificate: %w", err)
}

return flow.BlockHeader{
return &flow.BlockHeader{
ID: flow.HashToID(m.GetId()),
ParentID: flow.HashToID(m.GetParentId()),
Height: m.GetHeight(),
Expand Down Expand Up @@ -347,12 +347,12 @@ func QuorumCertificateToMessage(qc flow.QuorumCertificate) (*entities.QuorumCert
}, nil
}

func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) {
func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) {
if m == nil {
return flow.BlockDigest{}, ErrEmptyMessage
return &flow.BlockDigest{}, ErrEmptyMessage
}

return flow.BlockDigest{
return &flow.BlockDigest{
BlockID: flow.BytesToID(m.GetBlockId()),
Height: m.GetBlockHeight(),
Timestamp: m.GetBlockTimestamp().AsTime(),
Expand Down
4 changes: 2 additions & 2 deletions access/grpc/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestConvert_Block(t *testing.T) {
blockB, err := MessageToBlock(msg)
require.NoError(t, err)

assert.Equal(t, *blockA, blockB)
assert.Equal(t, blockA, blockB)

t.Run("Without timestamp", func(t *testing.T) {
blockA := test.BlockGenerator().New()
Expand All @@ -87,7 +87,7 @@ func TestConvert_BlockHeader(t *testing.T) {
headerB, err := MessageToBlockHeader(msg)
require.NoError(t, err)

assert.Equal(t, headerA, headerB)
assert.Equal(t, headerA, *headerB)

t.Run("Without timestamp", func(t *testing.T) {
headerA := test.BlockHeaderGenerator().New()
Expand Down
Loading

0 comments on commit 68ee152

Please sign in to comment.