diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 39ebf2f25b24..7e62a1ac825b 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -297,6 +297,7 @@ const ( func (l *BatchSubmitter) loop() { defer l.wg.Done() + txDataCh := make(chan txData) receiptsCh := make(chan txmgr.TxReceipt[txRef]) queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) @@ -329,12 +330,41 @@ func (l *BatchSubmitter) loop() { } } }() + go func() { + for { + select { + case txdata := <-txDataCh: + var candidate *txmgr.TxCandidate + var err error + if txdata.asBlob { + if candidate, err = l.blobTxCandidate(txdata); err != nil { + // We could potentially fall through and try a calldata tx instead, but this would + // likely result in the chain spending more in gas fees than it is tuned for, so best + // to just fail. We do not expect this error to trigger unless there is a serious bug + // or configuration issue. + l.state.TxFailed(txdata.ID()) + l.Log.Error("could not create blob tx candidate: %w", err) + } + } else { + // sanity check + if nf := len(txdata.frames); nf != 1 { + l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) + } + candidate = l.calldataTxCandidate(txdata.CallData()) + } + l.queueTx(txdata, false, candidate, queue, receiptsCh) + case <-l.shutdownCtx.Done(): + l.Log.Info("Shutting down tx sending loop") + return + } + } + }() ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh) + l.prepareStateToSubmitToL1(txDataCh) if !l.Txmgr.IsClosed() { queue.Wait() } else { @@ -369,7 +399,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh) + l.prepareStateToSubmitToL1(txDataCh) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -424,22 +454,24 @@ func (l *BatchSubmitter) waitNodeSync() error { return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12) } -// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is +// prepareStateToSubmitToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { +func (l *BatchSubmitter) prepareStateToSubmitToL1(txDataCh chan<- txData) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(l.killCtx, queue, receiptsCh) + txdata, err := l.prepareTxData(l.killCtx) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) } return } + // send txdata to be published by the tx sending loop + txDataCh <- txdata } } @@ -482,13 +514,13 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } } -// publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { +// prepareTxData prepares a single state tx to be sent to the L1 +func (l *BatchSubmitter) prepareTxData(ctx context.Context) (txData, error) { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { l.Log.Error("Failed to query L1 tip", "err", err) - return err + return txData{}, err } l.recordL1Tip(l1tip) @@ -497,16 +529,17 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t if err == io.EOF { l.Log.Trace("No transaction data available") - return err + return txData{}, err } else if err != nil { l.Log.Error("Unable to get tx data", "err", err) - return err + return txData{}, err } - if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil { - return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) + if err = l.publishToAltDAUpdateTxData(ctx, txdata); err != nil { + return txData{}, fmt.Errorf("BatchSubmitter.prepareAndQueueTxData failed: %w", err) } - return nil + + return txdata, nil } func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) { @@ -548,44 +581,28 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh l.queueTx(txData{}, true, candidate, queue, receiptsCh) } -// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. +// publishToAltDAUpdateTxData creates & queues for sending a transaction to the batch inbox address with the given `txData`. // The method will block if the queue's MaxPendingTransactions is exceeded. -func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { - var err error - // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. - - var candidate *txmgr.TxCandidate - if txdata.asBlob { - if candidate, err = l.blobTxCandidate(txdata); err != nil { - // We could potentially fall through and try a calldata tx instead, but this would - // likely result in the chain spending more in gas fees than it is tuned for, so best - // to just fail. We do not expect this error to trigger unless there is a serious bug - // or configuration issue. - return fmt.Errorf("could not create blob tx candidate: %w", err) - } - } else { +func (l *BatchSubmitter) publishToAltDAUpdateTxData(ctx context.Context, txdata txData) error { + // if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment. + if l.Config.UseAltDA { // sanity check if nf := len(txdata.frames); nf != 1 { l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) } - data := txdata.CallData() - // if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { - comm, err := l.AltDA.SetInput(ctx, data) - if err != nil { - l.Log.Error("Failed to post input to Alt DA", "error", err) - // requeue frame if we fail to post to the DA Provider so it can be retried - l.recordFailedTx(txdata.ID(), err) - return nil - } - l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID()) - // signal AltDA commitment tx with TxDataVersion1 - data = comm.TxData() + comm, err := l.AltDA.SetInput(ctx, txdata.CallData()) + if err != nil { + l.Log.Error("Failed to post input to Alt DA", "error", err) + // requeue frame if we fail to post to the DA Provider so it can be retried + l.recordFailedTx(txdata.ID(), err) + return err } - candidate = l.calldataTxCandidate(data) + l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID()) + // TODO: this is kind of weird... we're overwriting the frame's data with the commitment + // but keeping the same ID just so we can delete/requeue the frames on success/failure + txdata.frames[0].data = comm.TxData() } - l.queueTx(txdata, false, candidate, queue, receiptsCh) return nil }