Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 batch consumer #43

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
be0d884
add localstack s3 support for local testing
cortze Jan 16, 2025
7508c93
add first working draft of a s3 trace submitter + tests
cortze Jan 16, 2025
ebdec2b
WIP - extesion of hermes cmd to s3 config
cortze Jan 16, 2025
68e8573
update dependencies adding parquet-go and s3
cortze Jan 16, 2025
bd69e76
address feedback from Dennis
cortze Jan 22, 2025
4a0ff35
update root config for s3 extension
cortze Jan 22, 2025
1952a05
add: parquet and s3 benchmarks cmd
cortze Jan 22, 2025
042ae7f
add: localstack s3 instance to CI tests
cortze Jan 22, 2025
e8c8fb2
update: ignore .json files
cortze Jan 22, 2025
e036050
fix: make ci happy
cortze Jan 22, 2025
2b6e786
feat: add s3conf part of the eth-node struct + siplify init
cortze Jan 22, 2025
18fb939
update: s3 config init + loggin on s3 datastream creation
cortze Jan 22, 2025
ccedfc2
fix: spotted bug on logging level flag
cortze Jan 22, 2025
865b786
fix: add non-locking batcher methods + remove possible race-conditions
cortze Jan 22, 2025
9686d6a
ensure error on datastream init failure + apply s3 credentials only w…
cortze Jan 23, 2025
3b03b39
updt: s3key format to only producer
cortze Jan 23, 2025
db2dd04
update Docs
cortze Jan 23, 2025
cfcca1f
update CI test format
cortze Jan 23, 2025
920b266
add s3 tag
cortze Jan 27, 2025
626dfb0
WIP: not done yet
cortze Jan 27, 2025
4609b85
fix: bug on event type
cortze Jan 27, 2025
50f7182
fix: new tag
cortze Jan 27, 2025
1a68153
first working version of the specific event formating (still WIp, cod…
cortze Jan 28, 2025
6411a26
Merge branch 'upgrade-event-parquet-formatting' into add-s3-batch-con…
cortze Jan 28, 2025
35cf7b6
standarize fields on kinesis payloads
cortze Jan 29, 2025
81e19ef
add: parquet format
cortze Jan 29, 2025
a0e350c
add: event specific parquet structs
cortze Jan 29, 2025
afd00ce
adapt: s3-related code to independet parquet formats
cortze Jan 29, 2025
e61a36b
add: snappy compression by default
cortze Jan 29, 2025
672867d
fmt code
cortze Jan 29, 2025
2d05de7
rm: analysis py.file from repo
cortze Jan 29, 2025
7598eb7
correct: wrong pointer reference on connection renderer
cortze Jan 29, 2025
d026f40
make .env functional at docker-compose
cortze Jan 30, 2025
4f0106d
remove sync.Map from the eventStore
cortze Jan 30, 2025
18f4b3c
address comments
cortze Jan 30, 2025
52c1831
add timeout to s3 healthcheck on github actions' workflow
cortze Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ensure error on datastream init failure + apply s3 credentials only w…
…hen needed
cortze committed Jan 23, 2025
commit 9686d6a5a3e6b8a417c9f3260fd2877ac8c4357f
2 changes: 1 addition & 1 deletion eth/node.go
Original file line number Diff line number Diff line change
@@ -522,7 +522,7 @@ func (n *Node) startDataStream(ctx context.Context) (func(), error) {

go func() {
if err := n.ds.Start(backgroundCtx); err != nil {
slog.Warn("Failed to start data stream", tele.LogAttrError(err))
slog.Error("Failed to start data stream", tele.LogAttrError(err))
}
}()

15 changes: 6 additions & 9 deletions host/s3.go
Original file line number Diff line number Diff line change
@@ -414,12 +414,6 @@ func (s3cfg *S3DSConfig) CheckValidity() error {
if len(s3cfg.Bucket) <= 0 {
return fmt.Errorf("no s3 bucket was provided")
}
if len(s3cfg.AccessKeyID) <= 0 {
return fmt.Errorf("no s3 access-key was provided")
}
if len(s3cfg.SecretKey) <= 0 {
return fmt.Errorf("no s3 secret access key was provided")
}
if len(s3cfg.Region) <= 0 {
return fmt.Errorf("no s3 region was provided")
}
@@ -438,12 +432,15 @@ func (s3cfg S3DSConfig) ToAWSconfig() (*aws.Config, error) {
cfg, err := config.LoadDefaultConfig(
context.TODO(),
config.WithRegion(s3cfg.Region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
)
// only if the credential details where given
if len(s3cfg.AccessKeyID) > 0 && len(s3cfg.SecretKey) > 0 {
cfg.Credentials = credentials.NewStaticCredentialsProvider(
s3cfg.AccessKeyID,
s3cfg.SecretKey,
"", // empty session for now
)),
)
)
}
return &cfg, err
}