Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use channel of pointers to objects for consistency #797

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
8 changes: 4 additions & 4 deletions access/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ type Client interface {
GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionData, error)

// SubscribeExecutionDataByBlockID subscribes to execution data updates starting at the given block ID.
SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)
SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error)
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved

// SubscribeExecutionDataByBlockHeight subscribes to execution data updates starting at the given block height.
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeEventsByBlockID subscribes to events starting at the given block ID.
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan *flow.BlockEvents, <-chan error, error)
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved

// SubscribeEventsByBlockHeight subscribes to events starting at the given block height.
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...SubscribeOption) (<-chan *flow.BlockEvents, <-chan error, error)

// Close stops the client connection to the access node.
Close() error
Expand Down
34 changes: 17 additions & 17 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo
func (c *Client) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
) (<-chan flow.TransactionResult, <-chan error, error) {
) (<-chan *flow.TransactionResult, <-chan error, error) {
return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx)
}

Expand Down Expand Up @@ -298,14 +298,14 @@ func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Ide
func (c *Client) SubscribeExecutionDataByBlockID(
ctx context.Context,
startBlockID flow.Identifier,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockID(ctx, startBlockID)
}

func (c *Client) SubscribeExecutionDataByBlockHeight(
ctx context.Context,
startHeight uint64,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
) (<-chan *flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockHeight(ctx, startHeight)
}

Expand All @@ -314,7 +314,7 @@ func (c *Client) SubscribeEventsByBlockID(
startBlockID flow.Identifier,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
) (<-chan *flow.BlockEvents, <-chan error, error) {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}
Expand All @@ -324,7 +324,7 @@ func (c *Client) SubscribeEventsByBlockHeight(
startHeight uint64,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
) (<-chan *flow.BlockEvents, <-chan error, error) {
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}
Expand All @@ -333,68 +333,68 @@ func (c *Client) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
) (<-chan *flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
) (<-chan *flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksHeadersFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
) (<-chan *flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromLatest(ctx, blockStatus)
}

Expand All @@ -418,21 +418,21 @@ func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
) (<-chan *flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
24 changes: 12 additions & 12 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func BlockToMessage(b flow.Block) (*entities.Block, error) {
}, nil
}

func MessageToBlock(m *entities.Block) (flow.Block, error) {
func MessageToBlock(m *entities.Block) (*flow.Block, error) {
var timestamp time.Time
var err error

Expand All @@ -193,7 +193,7 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) {

tc, err := MessageToTimeoutCertificate(m.BlockHeader.GetLastViewTc())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
}

header := &flow.BlockHeader{
Expand All @@ -214,20 +214,20 @@ func MessageToBlock(m *entities.Block) (flow.Block, error) {

guarantees, err := MessagesToCollectionGuarantees(m.GetCollectionGuarantees())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
}

seals, err := MessagesToBlockSeals(m.GetBlockSeals())
if err != nil {
return flow.Block{}, err
return &flow.Block{}, err
}

payload := &flow.BlockPayload{
CollectionGuarantees: guarantees,
Seals: seals,
}

return flow.Block{
return &flow.Block{
BlockHeader: *header,
BlockPayload: *payload,
}, nil
Expand Down Expand Up @@ -257,9 +257,9 @@ func BlockHeaderToMessage(b flow.BlockHeader) (*entities.BlockHeader, error) {
}, nil
}

func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
func MessageToBlockHeader(m *entities.BlockHeader) (*flow.BlockHeader, error) {
if m == nil {
return flow.BlockHeader{}, ErrEmptyMessage
return &flow.BlockHeader{}, ErrEmptyMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can also be nil

Suggested change
return &flow.BlockHeader{}, ErrEmptyMessage
return nil, ErrEmptyMessage

}

var timestamp time.Time
Expand All @@ -270,10 +270,10 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {

timeoutCertificate, err := MessageToTimeoutCertificate(m.GetLastViewTc())
if err != nil {
return flow.BlockHeader{}, fmt.Errorf("error converting timeout certificate: %w", err)
return &flow.BlockHeader{}, fmt.Errorf("error converting timeout certificate: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}

return flow.BlockHeader{
return &flow.BlockHeader{
ID: flow.HashToID(m.GetId()),
ParentID: flow.HashToID(m.GetParentId()),
Height: m.GetHeight(),
Expand Down Expand Up @@ -347,12 +347,12 @@ func QuorumCertificateToMessage(qc flow.QuorumCertificate) (*entities.QuorumCert
}, nil
}

func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) {
func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (*flow.BlockDigest, error) {
if m == nil {
return flow.BlockDigest{}, ErrEmptyMessage
return &flow.BlockDigest{}, ErrEmptyMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

}

return flow.BlockDigest{
return &flow.BlockDigest{
BlockID: flow.BytesToID(m.GetBlockId()),
Height: m.GetBlockHeight(),
Timestamp: m.GetBlockTimestamp().AsTime(),
Expand Down
4 changes: 2 additions & 2 deletions access/grpc/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestConvert_Block(t *testing.T) {
blockB, err := MessageToBlock(msg)
require.NoError(t, err)

assert.Equal(t, *blockA, blockB)
assert.Equal(t, blockA, blockB)

t.Run("Without timestamp", func(t *testing.T) {
blockA := test.BlockGenerator().New()
Expand All @@ -87,7 +87,7 @@ func TestConvert_BlockHeader(t *testing.T) {
headerB, err := MessageToBlockHeader(msg)
require.NoError(t, err)

assert.Equal(t, headerA, headerB)
assert.Equal(t, headerA, *headerB)

t.Run("Without timestamp", func(t *testing.T) {
headerA := test.BlockHeaderGenerator().New()
Expand Down
Loading
Loading