Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send txs out #31

Merged
merged 4 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions cmd/collect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ var (
Usage: "Chainbound API key (or api-key@url)",
Category: "Sources Configuration",
},

// Tx receivers
&cli.StringSliceFlag{
Name: "tx-receivers",
EnvVars: []string{"TX_RECEIVERS"},
Usage: "URL(s) to send transactions to as octet-stream over http",
Category: "Tx Receivers Configuration",
},
&cli.StringSliceFlag{
Name: "tx-receivers-allowed-sources",
EnvVars: []string{"TX_RECEIVERS_ALLOWED_SOURCES"},
Usage: "sources of txs to send to receivers",
Category: "Tx Receivers Configuration",
},
}
)

Expand All @@ -88,14 +102,16 @@ func main() {

func runCollector(cCtx *cli.Context) error {
var (
debug = cCtx.Bool("debug")
outDir = cCtx.String("out")
uid = cCtx.String("uid")
checkNodeURI = cCtx.String("check-node")
nodeURIs = cCtx.StringSlice("node")
blxAuth = cCtx.StringSlice("blx")
edenAuth = cCtx.StringSlice("eden")
chainboundAuth = cCtx.StringSlice("chainbound")
debug = cCtx.Bool("debug")
outDir = cCtx.String("out")
uid = cCtx.String("uid")
checkNodeURI = cCtx.String("check-node")
nodeURIs = cCtx.StringSlice("node")
blxAuth = cCtx.StringSlice("blx")
edenAuth = cCtx.StringSlice("eden")
chainboundAuth = cCtx.StringSlice("chainbound")
receivers = cCtx.StringSlice("tx-receivers")
receiversAllowedSources = cCtx.StringSlice("tx-receivers-allowed-sources")
)

// Logger setup
Expand All @@ -119,14 +135,16 @@ func runCollector(cCtx *cli.Context) error {

// Start service components
opts := collector.CollectorOpts{
Log: log,
UID: uid,
OutDir: outDir,
CheckNodeURI: checkNodeURI,
Nodes: nodeURIs,
BloxrouteAuth: blxAuth,
EdenAuth: edenAuth,
ChainboundAuth: chainboundAuth,
Log: log,
UID: uid,
OutDir: outDir,
CheckNodeURI: checkNodeURI,
Nodes: nodeURIs,
BloxrouteAuth: blxAuth,
EdenAuth: edenAuth,
ChainboundAuth: chainboundAuth,
Receivers: receivers,
ReceiversAllowedSources: receiversAllowedSources,
}

collector.Start(&opts)
Expand Down
13 changes: 9 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ type CollectorOpts struct {
BloxrouteAuth []string
EdenAuth []string
ChainboundAuth []string

Receivers []string
ReceiversAllowedSources []string
}

// Start kicks off all the service components in the background
func Start(opts *CollectorOpts) {
processor := NewTxProcessor(TxProcessorOpts{
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
CheckNodeURI: opts.CheckNodeURI,
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
CheckNodeURI: opts.CheckNodeURI,
HTTPReceivers: opts.Receivers,
ReceiversAllowedSources: opts.ReceiversAllowedSources,
})
go processor.Start()

Expand Down
43 changes: 43 additions & 0 deletions collector/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package collector

import (
"bytes"
"context"
"io"
"net/http"
)

type TxReceiver interface {
SendTx(ctx context.Context, tx *TxIn) error
}

type HTTPReceiver struct {
url string
}

func NewHTTPReceiver(url string) *HTTPReceiver {
return &HTTPReceiver{
url: url,
}
}

func (r *HTTPReceiver) SendTx(ctx context.Context, tx *TxIn) error {
rawTx, err := tx.Tx.MarshalBinary()
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(rawTx))
metachris marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reuse the HTTP connection, you'll need to read the full body before closing: https://blog.cubieserver.de/2022/http-connection-reuse-in-go-clients/#:~:text=Client%20will%20only%20reuse%20connections,MaxConnsPerHost%20).

// ensure that we close the request so the httpClient can reuse the connection
io.Copy(io.Discard, res.Body)
res.Body.Close()

_, err = io.Copy(io.Discard, res.Body)
return err
}
58 changes: 54 additions & 4 deletions collector/tx_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import (
"go.uber.org/zap"
)

const (
receiverTimeout = 5 * time.Second
)

type TxProcessorOpts struct {
Log *zap.SugaredLogger
OutDir string
UID string
CheckNodeURI string
Log *zap.SugaredLogger
OutDir string
UID string
CheckNodeURI string
HTTPReceivers []string
ReceiversAllowedSources []string
}

type TxProcessor struct {
Expand All @@ -45,6 +51,9 @@ type TxProcessor struct {
checkNodeURI string
ethClient *ethclient.Client

receivers []TxReceiver
receiversAllowedSources []string

lastHealthCheckCall time.Time
}

Expand All @@ -55,6 +64,11 @@ type OutFiles struct {
}

func NewTxProcessor(opts TxProcessorOpts) *TxProcessor {
receivers := make([]TxReceiver, 0, len(opts.HTTPReceivers))
for _, r := range opts.HTTPReceivers {
receivers = append(receivers, NewHTTPReceiver(r))
}

return &TxProcessor{ //nolint:exhaustruct
log: opts.Log, // .With("uid", uid),
txC: make(chan TxIn, 100),
Expand All @@ -67,6 +81,9 @@ func NewTxProcessor(opts TxProcessorOpts) *TxProcessor {
srcMetrics: NewMetricsCounter(),

checkNodeURI: opts.CheckNodeURI,

receivers: receivers,
receiversAllowedSources: opts.ReceiversAllowedSources,
}
}

Expand Down Expand Up @@ -94,10 +111,43 @@ func (p *TxProcessor) Start() {
// start listening for transactions coming in through the channel
p.log.Info("Waiting for transactions...")
for txIn := range p.txC {
// send tx to receivers before processing it
// this will reduce the latency for the receivers but may lead to receivers getting the same tx multiple times
// or getting txs that are incorrect
go p.sendTxToReceivers(txIn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that place receives duplicates, it should probably be sent at https://github.com/flashbots/mempool-dumpster/blob/main/collector/tx_processor.go#L174

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this place in code is after a bunch of operations including writing to a file and request to a node

also duplicates count over all sources but we send only for allowed sources so this can lead to a big loss of txs

I think its better for receiver to do filtering then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are mistaken, the transactions are written to a file later, after deduplicating, inside the processTx function.

Your comment about loss of tx because filtering makes sense though. Would be good to add a comment about why this line of code is in this place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here where writing to a file https://github.com/flashbots/mempool-dumpster/blob/main/collector/tx_processor.go#L123 happens that I refer to
and its before deduplicating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment 62915f4

p.processTx(txIn)
}
}

func (p *TxProcessor) sendTxToReceivers(txIn TxIn) {
sourceOk := false
for _, allowedSource := range p.receiversAllowedSources {
if txIn.Source == allowedSource {
sourceOk = true
break
}
}
if !sourceOk {
return
}

ctx, cancel := context.WithTimeout(context.Background(), receiverTimeout)
defer cancel()

var wg sync.WaitGroup
for _, r := range p.receivers {
wg.Add(1)
go func(r TxReceiver) {
defer wg.Done()
err := r.SendTx(ctx, &txIn)
if err != nil {
p.log.Errorw("failed to send tx", "error", err)
}
}(r)
}
wg.Wait()
}

func (p *TxProcessor) processTx(txIn TxIn) {
tx := txIn.Tx
txHashLower := strings.ToLower(tx.Hash().Hex())
Expand Down
48 changes: 48 additions & 0 deletions collector/tx_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,57 @@
package collector

import (
"context"
"testing"
"time"

"github.com/flashbots/mempool-dumpster/common"
)

// var testLog = common.GetLogger(true, false)

// func TestBuilderAliases(t *testing.T) {
// tempDir := t.TempDir()
// txp := NewTxProcessor(testLog, tempDir, "test1")
// require.Equal(t, "collector", "collector")
// }

type MockTxReceiver struct {
ReceivedTx *TxIn
}

func (r *MockTxReceiver) SendTx(ctx context.Context, tx *TxIn) error {
r.ReceivedTx = tx
return nil
}

func TestTxProcessor_sendTxToReceivers(t *testing.T) {
receiver := MockTxReceiver{ReceivedTx: nil}

processor := NewTxProcessor(TxProcessorOpts{
Log: common.GetLogger(true, false),
OutDir: "",
UID: "",
CheckNodeURI: "",
HTTPReceivers: nil,
ReceiversAllowedSources: []string{"allowed"},
})
processor.receivers = append(processor.receivers, &receiver)

tx := TxIn{
T: time.Now(),
Tx: nil,
Source: "not-allowed",
}
processor.sendTxToReceivers(tx)

if receiver.ReceivedTx != nil {
t.Errorf("expected nil, got %v", receiver.ReceivedTx)
}

tx.Source = "allowed"
processor.sendTxToReceivers(tx)
if receiver.ReceivedTx == nil {
t.Errorf("expected tx, got nil")
}
}
Loading