diff --git a/.vscode/settings.json b/.vscode/settings.json index 736563a9..8aff5c4a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "go.testFlags": [ "-test.parallel", "4", + // Comment the following 2 lines to run unit tests. "-deploy-config", "../.devnet/devnetL1.json" ] diff --git a/common/store.go b/common/store.go index 62e4660f..2d508538 100644 --- a/common/store.go +++ b/common/store.go @@ -19,8 +19,7 @@ const ( ) var ( - ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") - ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed") + ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") ) func (b BackendType) String() string { diff --git a/flags/eigendaflags/cli.go b/flags/eigendaflags/cli.go index 9a13c772..f186834c 100644 --- a/flags/eigendaflags/cli.go +++ b/flags/eigendaflags/cli.go @@ -15,10 +15,11 @@ import ( var ( DisperserRPCFlagName = withFlagPrefix("disperser-rpc") + ResponseTimeoutFlagName = withFlagPrefix("response-timeout") + ConfirmationTimeoutFlagName = withFlagPrefix("confirmation-timeout") StatusQueryRetryIntervalFlagName = withFlagPrefix("status-query-retry-interval") StatusQueryTimeoutFlagName = withFlagPrefix("status-query-timeout") DisableTLSFlagName = withFlagPrefix("disable-tls") - ResponseTimeoutFlagName = withFlagPrefix("response-timeout") CustomQuorumIDsFlagName = withFlagPrefix("custom-quorum-ids") SignerPrivateKeyHexFlagName = withFlagPrefix("signer-private-key-hex") PutBlobEncodingVersionFlagName = withFlagPrefix("put-blob-encoding-version") @@ -27,6 +28,8 @@ var ( ConfirmationDepthFlagName = withFlagPrefix("confirmation-depth") EthRPCURLFlagName = withFlagPrefix("eth-rpc") SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr") + // Flags that are proxy specific, and not used by the eigenda-client + PutRetriesFlagName = withFlagPrefix("put-retries") ) func withFlagPrefix(s string) string { @@ -46,6 +49,26 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: []string{withEnvPrefix(envPrefix, "DISPERSER_RPC")}, Category: category, }, + &cli.DurationFlag{ + Name: ResponseTimeoutFlagName, + Usage: "Flag used to configure the underlying disperser-client. Total time to wait for the disperseBlob call to return or disperseAuthenticatedBlob stream to finish and close.", + Value: 60 * time.Second, + EnvVars: []string{withEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")}, + Category: category, + }, + &cli.DurationFlag{ + Name: ConfirmationTimeoutFlagName, + Usage: `The total amount of time that the client will spend waiting for EigenDA + to "confirm" (include onchain) a blob after it has been dispersed. Note that + we stick to "confirm" here but this really means InclusionTimeout, + not confirmation in the sense of confirmation depth. + + If ConfirmationTimeout time passes and the blob is not yet confirmed, + the client will return an api.ErrorFailover to let the caller failover to EthDA.`, + Value: 15 * time.Minute, + EnvVars: []string{withEnvPrefix(envPrefix, "CONFIRMATION_TIMEOUT")}, + Category: category, + }, &cli.DurationFlag{ Name: StatusQueryTimeoutFlagName, Usage: "Duration to wait for a blob to finalize after being sent for dispersal. Default is 30 minutes.", @@ -67,13 +90,6 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: []string{withEnvPrefix(envPrefix, "GRPC_DISABLE_TLS")}, Category: category, }, - &cli.DurationFlag{ - Name: ResponseTimeoutFlagName, - Usage: "Total time to wait for a response from the EigenDA disperser. Default is 60 seconds.", - Value: 60 * time.Second, - EnvVars: []string{withEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")}, - Category: category, - }, &cli.UintSliceFlag{ Name: CustomQuorumIDsFlagName, Usage: "Custom quorum IDs for writing blobs. Should not include default quorums 0 or 1.", @@ -137,6 +153,15 @@ func CLIFlags(envPrefix, category string) []cli.Flag { Category: category, Required: true, }, + // Flags that are proxy specific, and not used by the eigenda-client + // TODO: should we move this to a more specific category, like EIGENDA_STORE? + &cli.UintFlag{ + Name: PutRetriesFlagName, + Usage: "Number of times to retry blob dispersals.", + Value: 3, + EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")}, + Category: category, + }, } } @@ -144,10 +169,11 @@ func ReadConfig(ctx *cli.Context) clients.EigenDAClientConfig { waitForFinalization, confirmationDepth := parseConfirmationFlag(ctx.String(ConfirmationDepthFlagName)) return clients.EigenDAClientConfig{ RPC: ctx.String(DisperserRPCFlagName), + ResponseTimeout: ctx.Duration(ResponseTimeoutFlagName), + ConfirmationTimeout: ctx.Duration(ConfirmationTimeoutFlagName), StatusQueryRetryInterval: ctx.Duration(StatusQueryRetryIntervalFlagName), StatusQueryTimeout: ctx.Duration(StatusQueryTimeoutFlagName), DisableTLS: ctx.Bool(DisableTLSFlagName), - ResponseTimeout: ctx.Duration(ResponseTimeoutFlagName), CustomQuorumIDs: ctx.UintSlice(CustomQuorumIDsFlagName), SignerPrivateKeyHex: ctx.String(SignerPrivateKeyHexFlagName), PutBlobEncodingVersion: codecs.BlobEncodingVersion(ctx.Uint(PutBlobEncodingVersionFlagName)), diff --git a/go.mod b/go.mod index 38e89c39..5774c02c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.22 toolchain go1.22.0 require ( - github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d + github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c + github.com/avast/retry-go/v4 v4.6.0 github.com/consensys/gnark-crypto v0.12.1 github.com/ethereum-optimism/optimism v1.9.4-0.20240927020138-a9c7f349d10b github.com/ethereum/go-ethereum v1.14.11 @@ -20,6 +21,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/redis v0.33.0 github.com/urfave/cli/v2 v2.27.4 golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa + google.golang.org/grpc v1.64.1 ) require ( @@ -283,7 +285,6 @@ require ( golang.org/x/time v0.6.0 // indirect golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/grpc v1.64.1 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 8e602d79..6f4c239a 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2 github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE= github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= -github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d h1:2JtVArkLjW61kilkvvLyFHXBMp0ClF8PYCAxWqRnoDQ= -github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d/go.mod h1:sqUNf9Ak+EfAX82jDxrb4QbT/g3DViWD3b7YIk36skk= +github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c h1:TuvZlhWSrwpG6EPl+xjOo5UCp2QcVGl+EOY+BalqOXg= +github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c/go.mod h1:sqUNf9Ak+EfAX82jDxrb4QbT/g3DViWD3b7YIk36skk= github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a h1:L/UsJFw9M31FD/WgXTPFB0oxbq9Cu4Urea1xWPMQS7Y= github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a/go.mod h1:OF9lmS/57MKxS0xpSpX0qHZl0SKkDRpvJIvsGvMN1y8= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= @@ -45,6 +45,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/server/config.go b/server/config.go index e695755d..75a9866c 100644 --- a/server/config.go +++ b/server/config.go @@ -19,6 +19,7 @@ type Config struct { MemstoreConfig memstore.Config StorageConfig store.Config VerifierConfig verify.Config + PutRetries uint MemstoreEnabled bool } @@ -28,11 +29,11 @@ func ReadConfig(ctx *cli.Context) Config { edaClientConfig := eigendaflags.ReadConfig(ctx) return Config{ EdaClientConfig: edaClientConfig, - MemstoreConfig: memstore.ReadConfig(ctx), - StorageConfig: store.ReadConfig(ctx), VerifierConfig: verify.ReadConfig(ctx, edaClientConfig), - + PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName), MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), + MemstoreConfig: memstore.ReadConfig(ctx), + StorageConfig: store.ReadConfig(ctx), } } diff --git a/server/errors.go b/server/errors.go index 77ff759b..c53fdf7d 100644 --- a/server/errors.go +++ b/server/errors.go @@ -1,9 +1,14 @@ package server import ( + "errors" "fmt" "github.com/Layr-Labs/eigenda-proxy/commitments" + "github.com/Layr-Labs/eigenda-proxy/common" + "github.com/Layr-Labs/eigenda/api" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // MetaError includes both an error and commitment metadata @@ -22,3 +27,27 @@ func (me MetaError) Error() string { func (me MetaError) Unwrap() error { return me.Err } + +func is400(err error) bool { + // proxy requests are super simple (clients basically only pass bytes), so the only 400 possible + // is passing a blob that's too big. + // + // Any 400s returned by the disperser are due to formatting bugs in proxy code, for eg. badly + // IFFT'ing or encoding the blob, so we shouldn't return a 400 to the client. + // See https://github.com/Layr-Labs/eigenda/blob/bee55ed9207f16153c3fd8ebf73c219e68685def/api/errors.go#L22 + // for the 400s returned by the disperser server (currently only INVALID_ARGUMENT). + return errors.Is(err, common.ErrProxyOversizedBlob) +} + +func is429(err error) bool { + // grpc RESOURCE_EXHAUSTED is returned by the disperser server when the client has sent too many requests + // in a short period of time. This is a client-side issue, so we should return the 429 to the client. + st, isGRPCError := status.FromError(err) + return isGRPCError && st.Code() == codes.ResourceExhausted +} + +// 503 is returned to tell the caller (batcher) to failover to ethda b/c eigenda is temporarily down +func is503(err error) bool { + // TODO: would be cleaner to define a sentinel error in eigenda-core and use that instead + return errors.Is(err, &api.ErrorFailover{}) +} diff --git a/server/handlers.go b/server/handlers.go index 60469a99..8cb4e27c 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -9,7 +9,6 @@ import ( "net/http" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/Layr-Labs/eigenda-proxy/common" "github.com/gorilla/mux" ) @@ -181,11 +180,15 @@ func (svr *Server) handlePostShared(w http.ResponseWriter, r *http.Request, comm Err: fmt.Errorf("put request failed with commitment %v (commitment mode %v): %w", comm, meta.Mode, err), Meta: meta, } - if errors.Is(err, common.ErrEigenDAOversizedBlob) || errors.Is(err, common.ErrProxyOversizedBlob) { - // we add here any error that should be returned as a 400 instead of a 500. - // currently only includes oversized blob requests + switch { + case is400(err): http.Error(w, err.Error(), http.StatusBadRequest) - } else { + case is429(err): + http.Error(w, err.Error(), http.StatusTooManyRequests) + case is503(err): + // this tells the caller (batcher) to failover to ethda b/c eigenda is temporarily down + http.Error(w, err.Error(), http.StatusServiceUnavailable) + default: http.Error(w, err.Error(), http.StatusInternalServerError) } return err diff --git a/server/handlers_test.go b/server/handlers_test.go new file mode 100644 index 00000000..97958826 --- /dev/null +++ b/server/handlers_test.go @@ -0,0 +1,268 @@ +package server + +// The tests in this file test not only the handlers but also the middlewares, +// because server.registerRoutes(r) registers the handlers wrapped with middlewares. + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/Layr-Labs/eigenda-proxy/common" + "github.com/Layr-Labs/eigenda-proxy/metrics" + "github.com/Layr-Labs/eigenda-proxy/mocks" + "github.com/Layr-Labs/eigenda/api" + "github.com/ethereum/go-ethereum/log" + "github.com/golang/mock/gomock" + "github.com/gorilla/mux" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + simpleCommitmentPrefix = "\x00" + + // [alt-da, da layer, cert version] + opGenericPrefixStr = "\x01\x00\x00" + + testCommitStr = "9a7d4f1c3e5b8a09d1c0fa4b3f8e1d7c6b29f1e6d8c4a7b3c2d4e5f6a7b8c9d0" +) + +func TestHandlerGet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStorageMgr := mocks.NewMockIManager(ctrl) + + tests := []struct { + name string + url string + mockBehavior func() + expectedCode int + expectedBody string + }{ + { + name: "Failure - OP Keccak256 Internal Server Error", + url: fmt.Sprintf("/get/0x00%s", testCommitStr), + mockBehavior: func() { + mockStorageMgr.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("internal error")) + }, + expectedCode: http.StatusInternalServerError, + expectedBody: "", + }, + { + name: "Success - OP Keccak256", + url: fmt.Sprintf("/get/0x00%s", testCommitStr), + mockBehavior: func() { + mockStorageMgr.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) + }, + expectedCode: http.StatusOK, + expectedBody: testCommitStr, + }, + { + name: "Failure - OP Alt-DA Internal Server Error", + url: fmt.Sprintf("/get/0x010000%s", testCommitStr), + mockBehavior: func() { + mockStorageMgr.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("internal error")) + }, + expectedCode: http.StatusInternalServerError, + expectedBody: "", + }, + { + name: "Success - OP Alt-DA", + url: fmt.Sprintf("/get/0x010000%s", testCommitStr), + mockBehavior: func() { + mockStorageMgr.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) + }, + expectedCode: http.StatusOK, + expectedBody: testCommitStr, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mockBehavior() + + req := httptest.NewRequest(http.MethodGet, tt.url, nil) + rec := httptest.NewRecorder() + + // To add the vars to the context, + // we need to create a router through which we can pass the request. + r := mux.NewRouter() + // enable this logger to help debug tests + // logger := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)).With("test_name", t.Name()) + noopLogger := log.NewLogger(log.DiscardHandler()) + server := NewServer("localhost", 0, mockStorageMgr, noopLogger, metrics.NoopMetrics) + server.registerRoutes(r) + r.ServeHTTP(rec, req) + + require.Equal(t, tt.expectedCode, rec.Code) + // We only test for bodies for 200s because error messages contain a lot of information + // that isn't very important to test (plus its annoying to always change if error msg changes slightly). + if tt.expectedCode == http.StatusOK { + require.Equal(t, tt.expectedBody, rec.Body.String()) + } + + }) + } +} + +func TestHandlerPutSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStorageMgr := mocks.NewMockIManager(ctrl) + + tests := []struct { + name string + url string + body []byte + mockBehavior func() + expectedCode int + expectedBody string + }{ + { + name: "Success OP Mode Alt-DA", + url: "/put", + body: []byte("some data that will successfully be written to EigenDA"), + mockBehavior: func() { + mockStorageMgr.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) + }, + expectedCode: http.StatusOK, + expectedBody: opGenericPrefixStr + testCommitStr, + }, + { + name: "Success OP Mode Keccak256", + url: fmt.Sprintf("/put/0x00%s", testCommitStr), + body: []byte("some data that will successfully be written to EigenDA"), + mockBehavior: func() { + mockStorageMgr.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) + }, + expectedCode: http.StatusOK, + expectedBody: "", + }, + { + name: "Success Simple Commitment Mode", + url: "/put?commitment_mode=simple", + body: []byte("some data that will successfully be written to EigenDA"), + mockBehavior: func() { + mockStorageMgr.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) + }, + expectedCode: http.StatusOK, + expectedBody: simpleCommitmentPrefix + testCommitStr, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mockBehavior() + + req := httptest.NewRequest(http.MethodPost, tt.url, bytes.NewReader(tt.body)) + rec := httptest.NewRecorder() + + // To add the vars to the context, + // we need to create a router through which we can pass the request. + r := mux.NewRouter() + // enable this logger to help debug tests + // logger := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)).With("test_name", t.Name()) + noopLogger := log.NewLogger(log.DiscardHandler()) + server := NewServer("localhost", 0, mockStorageMgr, noopLogger, metrics.NoopMetrics) + server.registerRoutes(r) + r.ServeHTTP(rec, req) + + require.Equal(t, tt.expectedCode, rec.Code) + // We only test for bodies for 200s because error messages contain a lot of information + // that isn't very important to test (plus its annoying to always change if error msg changes slightly). + if tt.expectedCode == http.StatusOK { + require.Equal(t, tt.expectedBody, rec.Body.String()) + } + }) + } +} + +func TestHandlerPutErrors(t *testing.T) { + // Each test is run against all 3 different modes. + modes := []struct { + name string + url string + }{ + { + name: "OP Mode Alt-DA", + url: "/put", + }, + { + name: "OP Mode Keccak256", + url: fmt.Sprintf("/put/0x00%s", testCommitStr), + }, + { + name: "Simple Commitment Mode", + url: "/put?commitment_mode=simple", + }, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStorageMgr := mocks.NewMockIManager(ctrl) + + tests := []struct { + name string + mockStorageMgrPutReturnedErr error + expectedHTTPCode int + }{ + { + // we only test OK status here. Returned commitment is checked in TestHandlerPut + name: "Success - 200", + mockStorageMgrPutReturnedErr: nil, + expectedHTTPCode: http.StatusOK, + }, + { + name: "Failure - InternalServerError 500", + mockStorageMgrPutReturnedErr: fmt.Errorf("internal error"), + expectedHTTPCode: http.StatusInternalServerError, + }, + { + // if /put results in ErrorFailover (returned by eigenda-client), we should return 503 + name: "Failure - Failover 503", + mockStorageMgrPutReturnedErr: &api.ErrorFailover{}, + expectedHTTPCode: http.StatusServiceUnavailable, + }, + { + name: "Failure - TooManyRequests 429", + mockStorageMgrPutReturnedErr: status.Errorf(codes.ResourceExhausted, "too many requests"), + expectedHTTPCode: http.StatusTooManyRequests, + }, + { + // only 400s are due to oversized blobs right now + name: "Failure - BadRequest 400", + mockStorageMgrPutReturnedErr: common.ErrProxyOversizedBlob, + expectedHTTPCode: http.StatusBadRequest, + }, + } + + for _, tt := range tests { + for _, mode := range modes { + t.Run(tt.name+" / "+mode.name, func(t *testing.T) { + mockStorageMgr.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, tt.mockStorageMgrPutReturnedErr) + + req := httptest.NewRequest(http.MethodPost, mode.url, strings.NewReader("optional body to be sent to eigenda")) + rec := httptest.NewRecorder() + + // To add the vars to the context, + // we need to create a router through which we can pass the request. + r := mux.NewRouter() + // enable this logger to help debug tests + logger := log.NewLogger(log.NewTerminalHandler(os.Stdout, true)).With("test_name", t.Name()) + // noopLogger := log.NewLogger(log.DiscardHandler()) + server := NewServer("localhost", 0, mockStorageMgr, logger, metrics.NoopMetrics) + server.registerRoutes(r) + r.ServeHTTP(rec, req) + + require.Equal(t, tt.expectedHTTPCode, rec.Code) + }) + } + } + +} diff --git a/server/load_store.go b/server/load_store.go index e79145bf..cb75a6ae 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -112,6 +112,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr MaxBlobSizeBytes: cfg.EigenDAConfig.MemstoreConfig.MaxBlobSizeBytes, EthConfirmationDepth: cfg.EigenDAConfig.VerifierConfig.EthConfirmationDepth, StatusQueryTimeout: cfg.EigenDAConfig.EdaClientConfig.StatusQueryTimeout, + PutRetries: cfg.EigenDAConfig.PutRetries, }, ) } diff --git a/server/server_test.go b/server/server_test.go deleted file mode 100644 index 3fc2c93c..00000000 --- a/server/server_test.go +++ /dev/null @@ -1,188 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/Layr-Labs/eigenda-proxy/metrics" - "github.com/Layr-Labs/eigenda-proxy/mocks" - "github.com/ethereum/go-ethereum/log" - "github.com/golang/mock/gomock" - "github.com/gorilla/mux" - "github.com/stretchr/testify/require" -) - -const ( - simpleCommitmentPrefix = "\x00" - - // [alt-da, da layer, cert version] - opGenericPrefixStr = "\x01\x00\x00" - - testCommitStr = "9a7d4f1c3e5b8a09d1c0fa4b3f8e1d7c6b29f1e6d8c4a7b3c2d4e5f6a7b8c9d0" -) - -func TestHandleOPCommitments(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockRouter := mocks.NewMockIManager(ctrl) - - m := metrics.NewMetrics("default") - server := NewServer("localhost", 8080, mockRouter, log.New(), m) - - tests := []struct { - name string - url string - mockBehavior func() - expectedCode int - expectedBody string - }{ - { - name: "Failure - OP Keccak256 Internal Server Error", - url: fmt.Sprintf("/get/0x00%s", testCommitStr), - mockBehavior: func() { - mockRouter.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("internal error")) - }, - expectedCode: http.StatusInternalServerError, - expectedBody: "", - }, - { - name: "Success - OP Keccak256", - url: fmt.Sprintf("/get/0x00%s", testCommitStr), - mockBehavior: func() { - mockRouter.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) - }, - expectedCode: http.StatusOK, - expectedBody: testCommitStr, - }, - { - name: "Failure - OP Alt-DA Internal Server Error", - url: fmt.Sprintf("/get/0x010000%s", testCommitStr), - mockBehavior: func() { - mockRouter.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("internal error")) - }, - expectedCode: http.StatusInternalServerError, - expectedBody: "", - }, - { - name: "Success - OP Alt-DA", - url: fmt.Sprintf("/get/0x010000%s", testCommitStr), - mockBehavior: func() { - mockRouter.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) - }, - expectedCode: http.StatusOK, - expectedBody: testCommitStr, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.mockBehavior() - - req := httptest.NewRequest(http.MethodGet, tt.url, nil) - rec := httptest.NewRecorder() - - // To add the vars to the context, - // we need to create a router through which we can pass the request. - r := mux.NewRouter() - server.registerRoutes(r) - r.ServeHTTP(rec, req) - - require.Equal(t, tt.expectedCode, rec.Code) - // We don't test for bodies because it's a specific error message - // that contains a lot of information - // require.Equal(t, tt.expectedBody, rec.Body.String()) - - }) - } -} - -func TestHandlerPut(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockRouter := mocks.NewMockIManager(ctrl) - server := NewServer("localhost", 8080, mockRouter, log.New(), metrics.NoopMetrics) - - tests := []struct { - name string - url string - body []byte - mockBehavior func() - expectedCode int - expectedBody string - expectError bool - }{ - { - name: "Failure OP Mode Alt-DA - InternalServerError", - url: "/put", - body: []byte("some data that will trigger an internal error"), - mockBehavior: func() { - mockRouter.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("internal error")) - }, - expectedCode: http.StatusInternalServerError, - expectedBody: "", - expectError: true, - }, - { - name: "Success OP Mode Alt-DA", - url: "/put", - body: []byte("some data that will successfully be written to EigenDA"), - mockBehavior: func() { - mockRouter.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) - }, - expectedCode: http.StatusOK, - expectedBody: opGenericPrefixStr + testCommitStr, - expectError: false, - }, - { - name: "Success OP Mode Keccak256", - url: fmt.Sprintf("/put/0x00%s", testCommitStr), - body: []byte("some data that will successfully be written to EigenDA"), - mockBehavior: func() { - mockRouter.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) - }, - expectedCode: http.StatusOK, - expectedBody: "", - expectError: false, - }, - { - name: "Success Simple Commitment Mode", - url: "/put?commitment_mode=simple", - body: []byte("some data that will successfully be written to EigenDA"), - mockBehavior: func() { - mockRouter.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(testCommitStr), nil) - }, - expectedCode: http.StatusOK, - expectedBody: simpleCommitmentPrefix + testCommitStr, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.mockBehavior() - - req := httptest.NewRequest(http.MethodPost, tt.url, bytes.NewReader(tt.body)) - rec := httptest.NewRecorder() - - // To add the vars to the context, - // we need to create a router through which we can pass the request. - r := mux.NewRouter() - server.registerRoutes(r) - r.ServeHTTP(rec, req) - - require.Equal(t, tt.expectedCode, rec.Code) - if !tt.expectError && tt.expectedBody != "" { - require.Equal(t, []byte(tt.expectedBody), rec.Body.Bytes()) - } - - if !tt.expectError && tt.expectedBody == "" { - require.Equal(t, []byte(nil), rec.Body.Bytes()) - } - }) - } -} diff --git a/store/generated_key/eigenda/eigenda.go b/store/generated_key/eigenda/eigenda.go index 70b972d4..b67ce1d2 100644 --- a/store/generated_key/eigenda/eigenda.go +++ b/store/generated_key/eigenda/eigenda.go @@ -8,8 +8,13 @@ import ( "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type StoreConfig struct { @@ -20,6 +25,9 @@ type StoreConfig struct { // total duration time that client waits for blob to confirm StatusQueryTimeout time.Duration + + // number of times to retry eigenda blob dispersals + PutRetries uint } // Store does storage interactions and verifications for blobs with DA. @@ -70,7 +78,40 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) { return nil, fmt.Errorf("%w: blob length %d, max blob size %d", common.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes) } - blobInfo, err := e.client.PutBlob(ctx, value) + // We attempt to disperse the blob to EigenDA up to 3 times, unless we get a 400 error on any attempt. + blobInfo, err := retry.DoWithData( + func() (*disperser.BlobInfo, error) { + return e.client.PutBlob(ctx, value) + }, + retry.RetryIf(func(err error) bool { + st, isGRPCError := status.FromError(err) + if !isGRPCError { + // api.ErrorFailover is returned, so we should retry + return true + } + //nolint:exhaustive // we only care about a few grpc error codes + switch st.Code() { + case codes.InvalidArgument: + // we don't retry 400 errors because there is no point, + // we are passing invalid data + return false + case codes.ResourceExhausted: + // we retry on 429s because *can* mean we are being rate limited + // we sleep 1 second... very arbitrarily, because we don't have more info. + // grpc error itself should return a backoff time, + // see https://github.com/Layr-Labs/eigenda/issues/845 for more details + time.Sleep(1 * time.Second) + return true + default: + return true + } + }), + // only return the last error. If it is an api.ErrorFailover, then the handler will convert + // it to an http 503 to signify to the client (batcher) to failover to ethda + // b/c eigenda is temporarily down. + retry.LastErrorOnly(true), + retry.Attempts(e.cfg.PutRetries), + ) if err != nil { // TODO: we will want to filter for errors here and return a 503 when needed // ie when dispersal itself failed, or that we timed out waiting for batch to land onchain