Skip to content
2 changes: 1 addition & 1 deletion op-batcher/batcher/batch_submitter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a slightly easier and more straight forward way of stopping the batcher is to call BatcherService.Stop when a critical error is found in the driver. It feels a bit wrong that we need to pass down a very high-level closer function down to the driver since the BatcherService already has a Stop function (that should be involved when the closeApp function is called that is passed down in the batcher.Main function). So we're still calling it, but more indirectly than necessary.

So instead, we can just pass down BatcherService.Stop to the driver when setting it up at BatcherService.initDriver.

Note that this way, we also don't need to create closeApp functions in the test setups that call BatcherServiceFromCLIConfig. And currently, the tests closer functions just make the tests fail, but still not stop the actual batcher service, if I see correctly.

Or am I missing something and you considered this slightly easier option?

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ func Main(version string) cliapp.LifecycleAction {
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)

l.Info("Initializing Batch Submitter")
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l, closeApp)
}
}
33 changes: 19 additions & 14 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type DriverSetup struct {
// batches to L1 for availability.
type BatchSubmitter struct {
DriverSetup
closeApp context.CancelCauseFunc
Comment on lines 101 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like the closeApp naturally fits into the DriverSetup.


wg *sync.WaitGroup
shutdownCtx, killCtx context.Context
Expand All @@ -121,13 +122,14 @@ type BatchSubmitter struct {
}

// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
func NewBatchSubmitter(setup DriverSetup, closeApp context.CancelCauseFunc) *BatchSubmitter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...then we can also avoid passing it around like here.

state := NewChannelManager(setup.Log, setup.Metr, setup.ChannelConfig, setup.RollupConfig)
if setup.ChannelOutFactory != nil {
state.SetChannelOutFactory(setup.ChannelOutFactory)
}

batcher := &BatchSubmitter{
closeApp: closeApp,
DriverSetup: setup,
channelMgr: state,
}
Expand Down Expand Up @@ -623,20 +625,12 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig
return
}

var rpcErr rpc.Error
if errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError() {
l.Log.Error("SetMaxDASize RPC method unavailable on endpoint, shutting down. Either enable it or disable throttling.",
"endpoint", endpoint, "err", err)

// We have a strict requirement that all endpoints must have the SetMaxDASize endpoint, and shut down if this RPC method is not available
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Call StopBatchSubmitting in another goroutine to avoid deadlock.
go func() {
_ = l.StopBatchSubmitting(ctx)
}()
return
if isCriticalThrottlingRPCError(err) {
// We have a strict requirement that all endpoints must have the SetMaxDASize endpoint,
// and shut down if this RPC method is not available or returns another application-level error.
l.shutdownOnCriticalError(fmt.Errorf("SetMaxDASize RPC method unavailable at %s, either enable it or disable throttling: %w", endpoint, err))
} else if err != nil {
// Transport-level errors are retried.
l.Log.Warn("SetMaxDASize RPC failed for endpoint, retrying.", "endpoint", endpoint, "err", err)
retryTimer.Reset(retryInterval)
return
Expand Down Expand Up @@ -671,6 +665,17 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig
}
}

func isCriticalThrottlingRPCError(err error) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for this would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Short & Sweet.

var rpcErr rpc.Error
return errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError()
}

func (l *BatchSubmitter) shutdownOnCriticalError(err error) {
l.Log.Error("Shutting down batcher on critical error", "err", err)
// Call closeApp to trigger process to exit (gracefully)
l.closeApp(err)
}
Comment on lines +677 to +681
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High Severity severity

Unprotected nil closeApp invocation can cause panic

The new shutdownOnCriticalError method unconditionally calls l.closeApp(err), which is passed as nil in many test and example invocations. This will cause a runtime panic when a critical throttling RPC error occurs, leading to an unintended crash.

Add a nil check before calling l.closeApp(err), or default to a safe no-op function if closeApp is not provided.


Don't like this finding? Reply "dismiss" and it won't appear again in future scans.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to allow using a nil closeApp, e.g. in tests, it may indeed make sense to add a nil check.

Comment on lines +677 to +681
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for this would be good; does it shut down? Is it graceful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this manually so far, and yes it does shut down successfully. I'm not totally sure we want to write a full end to end test for this, because we would need to spawn the batcher in a subprocess, attach it to an rpc endpoint returning MethodNotFound, and check on that process terminating. We don't have tests like this for sending an interrupt to the process, which should have the same effect. I'll see if I can add a little bit more unit tests to shore this up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extended an existing test here a57e224. I'm pretty happy with this level of testing, it shows the "closeApp" is called under the right conditions. Having that closeApp actually cause the process to exit is arguably out of scope, since this is generic behavior from op-service that we rely on for multiple services.


// throttlingLoop acts as a distributor that spawns individual throttling loops for each endpoint
// and fans out the unsafe bytes updates to each endpoint
func (l *BatchSubmitter) throttlingLoop(wg *sync.WaitGroup, unsafeBytesUpdated chan int64) {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
},
ChannelConfig: defaultTestChannelConfig(),
EndpointProvider: ep,
}), ep
}, nil), ep
}

func TestBatchSubmitter_SafeL1Origin(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ type DriverSetupOption func(setup *DriverSetup)
// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) {
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, closeApp context.CancelCauseFunc, opts ...DriverSetupOption) (*BatcherService, error) {
var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log, opts...); err != nil {
if err := bs.initFromCLIConfig(ctx, version, cfg, log, closeApp, opts...); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &bs, nil
}

func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error {
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, closeApp context.CancelCauseFunc, opts ...DriverSetupOption) error {
bs.Version = version
bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped
Expand Down Expand Up @@ -182,7 +182,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
bs.initDriver(opts...)
bs.initDriver(closeApp, opts...)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
func (bs *BatcherService) initDriver(closeApp context.CancelCauseFunc, opts ...DriverSetupOption) {
ds := DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
Expand All @@ -396,7 +396,7 @@ func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
for _, opt := range opts {
opt(&ds)
}
bs.driver = NewBatchSubmitter(ds)
bs.driver = NewBatchSubmitter(ds, closeApp)
}

func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
Expand Down
2 changes: 1 addition & 1 deletion op-devstack/sysgo/l2_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func WithBatcher(batcherID stack.L2BatcherID, l1ELID stack.L1ELNodeID, l2CLID st

batcher, err := bss.BatcherServiceFromCLIConfig(
p.Ctx(), "0.0.1", batcherCLIConfig,
logger)
logger, nil)
require.NoError(err)
require.NoError(batcher.Start(p.Ctx()))
p.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/interop/supersystem_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *interopE2ESystem) newBatcherForL2(
}
batcher, err := bss.BatcherServiceFromCLIConfig(
context.Background(), "0.0.1", batcherCLIConfig,
logger.New("service", "batcher"))
logger.New("service", "batcher"), nil)
require.NoError(s.t, err)
require.NoError(s.t, batcher.Start(context.Background()))
s.t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/system/conductor/sequencer_failover_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func setupBatcher(t *testing.T, sys *e2esys.System, conductors map[string]*condu
CompressionAlgo: derive.Zlib,
}

batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"], nil)
require.NoError(t, err)
err = batcher.Start(context.Background())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/system/e2esys/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
}

// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"], nil)
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
Expand Down