Skip to content

Commit

Permalink
feat(wip): implement a altda -> txsend nonblocking pipeline to submit…
Browse files Browse the repository at this point in the history
… altda blobs concurrently
  • Loading branch information
samlaf committed Aug 23, 2024
1 parent e51e887 commit 61b69b3
Showing 1 changed file with 60 additions and 43 deletions.
103 changes: 60 additions & 43 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ const (
func (l *BatchSubmitter) loop() {
defer l.wg.Done()

txDataCh := make(chan txData)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)

Expand Down Expand Up @@ -329,12 +330,41 @@ func (l *BatchSubmitter) loop() {
}
}
}()
go func() {
for {
select {
case txdata := <-txDataCh:
var candidate *txmgr.TxCandidate
var err error
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
l.state.TxFailed(txdata.ID())
l.Log.Error("could not create blob tx candidate: %w", err)
}
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
}
l.queueTx(txdata, false, candidate, queue, receiptsCh)
case <-l.shutdownCtx.Done():
l.Log.Info("Shutting down tx sending loop")
return
}
}
}()

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.prepareStateToSubmitToL1(txDataCh)
if !l.Txmgr.IsClosed() {
queue.Wait()
} else {
Expand Down Expand Up @@ -369,7 +399,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.prepareStateToSubmitToL1(txDataCh)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -424,22 +454,24 @@ func (l *BatchSubmitter) waitNodeSync() error {
return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12)
}

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// prepareStateToSubmitToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) prepareStateToSubmitToL1(txDataCh chan<- txData) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
txdata, err := l.prepareTxData(l.killCtx)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
}
return
}
// send txdata to be published by the tx sending loop
txDataCh <- txdata
}
}

Expand Down Expand Up @@ -482,13 +514,13 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// prepareTxData prepares a single state tx to be sent to the L1
func (l *BatchSubmitter) prepareTxData(ctx context.Context) (txData, error) {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.Log.Error("Failed to query L1 tip", "err", err)
return err
return txData{}, err
}
l.recordL1Tip(l1tip)

Expand All @@ -497,16 +529,17 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t

if err == io.EOF {
l.Log.Trace("No transaction data available")
return err
return txData{}, err
} else if err != nil {
l.Log.Error("Unable to get tx data", "err", err)
return err
return txData{}, err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
if err = l.publishToAltDAUpdateTxData(ctx, txdata); err != nil {
return txData{}, fmt.Errorf("BatchSubmitter.prepareAndQueueTxData failed: %w", err)
}
return nil

return txdata, nil
}

func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) {
Expand Down Expand Up @@ -548,44 +581,28 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// publishToAltDAUpdateTxData creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

var candidate *txmgr.TxCandidate
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
func (l *BatchSubmitter) publishToAltDAUpdateTxData(ctx context.Context, txdata txData) error {
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
comm, err := l.AltDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return nil
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// signal AltDA commitment tx with TxDataVersion1
data = comm.TxData()
comm, err := l.AltDA.SetInput(ctx, txdata.CallData())
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return err
}
candidate = l.calldataTxCandidate(data)
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// TODO: this is kind of weird... we're overwriting the frame's data with the commitment
// but keeping the same ID just so we can delete/requeue the frames on success/failure
txdata.frames[0].data = comm.TxData()
}

l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

Expand Down

0 comments on commit 61b69b3

Please sign in to comment.