Skip to content

Commit

Permalink
send txs out
Browse files Browse the repository at this point in the history
  • Loading branch information
dvush committed Dec 1, 2023
1 parent c8dae45 commit 3741073
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 24 deletions.
50 changes: 34 additions & 16 deletions cmd/collect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ var (
Usage: "Chainbound API key (or api-key@url)",
Category: "Sources Configuration",
},

// Tx receivers
&cli.StringSliceFlag{
Name: "tx-receivers",
EnvVars: []string{"TX_RECEIVERS"},
Usage: "URL(s) to send transactions to as octet-stream over http",
Category: "Tx Receivers Configuration",
},
&cli.StringSliceFlag{
Name: "tx-receivers-allowed-sources",
EnvVars: []string{"TX_RECEIVERS_ALLOWED_SOURCES"},
Usage: "sources of txs to send to receivers",
Category: "Tx Receivers Configuration",
},
}
)

Expand All @@ -88,14 +102,16 @@ func main() {

func runCollector(cCtx *cli.Context) error {
var (
debug = cCtx.Bool("debug")
outDir = cCtx.String("out")
uid = cCtx.String("uid")
checkNodeURI = cCtx.String("check-node")
nodeURIs = cCtx.StringSlice("node")
blxAuth = cCtx.StringSlice("blx")
edenAuth = cCtx.StringSlice("eden")
chainboundAuth = cCtx.StringSlice("chainbound")
debug = cCtx.Bool("debug")
outDir = cCtx.String("out")
uid = cCtx.String("uid")
checkNodeURI = cCtx.String("check-node")
nodeURIs = cCtx.StringSlice("node")
blxAuth = cCtx.StringSlice("blx")
edenAuth = cCtx.StringSlice("eden")
chainboundAuth = cCtx.StringSlice("chainbound")
receivers = cCtx.StringSlice("tx-receivers")
receiversAllowedSources = cCtx.StringSlice("tx-receivers-allowed-sources")
)

// Logger setup
Expand All @@ -119,14 +135,16 @@ func runCollector(cCtx *cli.Context) error {

// Start service components
opts := collector.CollectorOpts{
Log: log,
UID: uid,
OutDir: outDir,
CheckNodeURI: checkNodeURI,
Nodes: nodeURIs,
BloxrouteAuth: blxAuth,
EdenAuth: edenAuth,
ChainboundAuth: chainboundAuth,
Log: log,
UID: uid,
OutDir: outDir,
CheckNodeURI: checkNodeURI,
Nodes: nodeURIs,
BloxrouteAuth: blxAuth,
EdenAuth: edenAuth,
ChainboundAuth: chainboundAuth,
Receivers: receivers,
ReceiversAllowedSources: receiversAllowedSources,
}

collector.Start(&opts)
Expand Down
13 changes: 9 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ type CollectorOpts struct {
BloxrouteAuth []string
EdenAuth []string
ChainboundAuth []string

Receivers []string
ReceiversAllowedSources []string
}

// Start kicks off all the service components in the background
func Start(opts *CollectorOpts) {
processor := NewTxProcessor(TxProcessorOpts{
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
CheckNodeURI: opts.CheckNodeURI,
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
CheckNodeURI: opts.CheckNodeURI,
HTTPReceivers: opts.Receivers,
ReceiversAllowedSources: opts.ReceiversAllowedSources,
})
go processor.Start()

Expand Down
41 changes: 41 additions & 0 deletions collector/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package collector

import (
"bytes"
"context"
"net/http"
)

type TxReceiver interface {
SendTx(ctx context.Context, tx *TxIn) error
}

type HTTPReceiver struct {
url string
}

func NewHTTPReceiver(url string) *HTTPReceiver {
return &HTTPReceiver{
url: url,
}
}

func (r *HTTPReceiver) SendTx(ctx context.Context, tx *TxIn) error {
rawTx, err := tx.Tx.MarshalBinary()
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(rawTx))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
return err
}
55 changes: 51 additions & 4 deletions collector/tx_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import (
"go.uber.org/zap"
)

const (
receiverTimeout = 5 * time.Second
)

type TxProcessorOpts struct {
Log *zap.SugaredLogger
OutDir string
UID string
CheckNodeURI string
Log *zap.SugaredLogger
OutDir string
UID string
CheckNodeURI string
HTTPReceivers []string
ReceiversAllowedSources []string
}

type TxProcessor struct {
Expand All @@ -45,6 +51,9 @@ type TxProcessor struct {
checkNodeURI string
ethClient *ethclient.Client

receivers []TxReceiver
receiversAllowedSources []string

lastHealthCheckCall time.Time
}

Expand All @@ -55,6 +64,11 @@ type OutFiles struct {
}

func NewTxProcessor(opts TxProcessorOpts) *TxProcessor {
receivers := make([]TxReceiver, 0, len(opts.HTTPReceivers))
for _, r := range opts.HTTPReceivers {
receivers = append(receivers, NewHTTPReceiver(r))
}

return &TxProcessor{ //nolint:exhaustruct
log: opts.Log, // .With("uid", uid),
txC: make(chan TxIn, 100),
Expand All @@ -67,6 +81,9 @@ func NewTxProcessor(opts TxProcessorOpts) *TxProcessor {
srcMetrics: NewMetricsCounter(),

checkNodeURI: opts.CheckNodeURI,

receivers: receivers,
receiversAllowedSources: opts.ReceiversAllowedSources,
}
}

Expand Down Expand Up @@ -94,10 +111,40 @@ func (p *TxProcessor) Start() {
// start listening for transactions coming in through the channel
p.log.Info("Waiting for transactions...")
for txIn := range p.txC {
go p.sendTxToReceivers(txIn)
p.processTx(txIn)
}
}

func (p *TxProcessor) sendTxToReceivers(txIn TxIn) {
sourceOk := false
for _, allowedSource := range p.receiversAllowedSources {
if txIn.Source == allowedSource {
sourceOk = true
break
}
}
if !sourceOk {
return
}

ctx, cancel := context.WithTimeout(context.Background(), receiverTimeout)
defer cancel()

var wg sync.WaitGroup
for _, r := range p.receivers {
wg.Add(1)
go func(r TxReceiver) {
defer wg.Done()
err := r.SendTx(ctx, &txIn)
if err != nil {
p.log.Errorw("failed to send tx", "error", err)
}
}(r)
}
wg.Wait()
}

func (p *TxProcessor) processTx(txIn TxIn) {
tx := txIn.Tx
txHashLower := strings.ToLower(tx.Hash().Hex())
Expand Down
48 changes: 48 additions & 0 deletions collector/tx_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,57 @@
package collector

import (
"context"
"testing"
"time"

"github.com/flashbots/mempool-dumpster/common"
)

// var testLog = common.GetLogger(true, false)

// func TestBuilderAliases(t *testing.T) {
// tempDir := t.TempDir()
// txp := NewTxProcessor(testLog, tempDir, "test1")
// require.Equal(t, "collector", "collector")
// }

type MockTxReceiver struct {
ReceivedTx *TxIn
}

func (r *MockTxReceiver) SendTx(ctx context.Context, tx *TxIn) error {
r.ReceivedTx = tx
return nil
}

func TestTxProcessor_sendTxToReceivers(t *testing.T) {
receiver := MockTxReceiver{ReceivedTx: nil}

processor := NewTxProcessor(TxProcessorOpts{
Log: common.GetLogger(true, false),
OutDir: "",
UID: "",
CheckNodeURI: "",
HTTPReceivers: nil,
ReceiversAllowedSources: []string{"allowed"},
})
processor.receivers = append(processor.receivers, &receiver)

tx := TxIn{
T: time.Now(),
Tx: nil,
Source: "not-allowed",
}
processor.sendTxToReceivers(tx)

if receiver.ReceivedTx != nil {
t.Errorf("expected nil, got %v", receiver.ReceivedTx)
}

tx.Source = "allowed"
processor.sendTxToReceivers(tx)
if receiver.ReceivedTx == nil {
t.Errorf("expected tx, got nil")
}
}

0 comments on commit 3741073

Please sign in to comment.