Skip to content

Commit

Permalink
Bucket level metric (#542)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored and pschork committed May 30, 2024
1 parent d825980 commit b6dc720
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 51 deletions.
17 changes: 11 additions & 6 deletions common/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@ import (
// ID is the authenticated Account ID. For retrieval requests, the requester ID will be the requester's IP address.
type RequesterID = string

// RequesterName is the friendly name of the party making the request. In the case
// of a rollup making a dispersal request, the RequesterName is the name of the rollup.
type RequesterName = string

type RequestParams struct {
RequesterID RequesterID
BlobSize uint
Rate RateParam
Info interface{}
RequesterID RequesterID
RequesterName RequesterName
BlobSize uint
Rate RateParam
Info interface{}
}

type RateLimiter interface {
// AllowRequest checks whether the request should be allowed. If the request is allowed, the function returns true.
// If the request is not allowed, the function returns false and the RequestParams of the request that was not allowed.
// In order to for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed.
// In order for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed.
// Each RequestParams object represents a single request. Each request is subjected to the same GlobalRateParams, but the
// individual parameters of the request can differ.
//
Expand All @@ -37,7 +42,7 @@ type RateLimiter interface {

type GlobalRateParams struct {
// BucketSizes are the time scales at which the rate limit is enforced.
// For each time scale, the rate limiter will make sure that the give rate (possibly subject to a relaxation given
// For each time scale, the rate limiter will make sure that the given rate (possibly subject to a relaxation given
// by one of the Multipliers) is observed when the request bandwidth is averaged at this time scale.
// In terms of implementation, the rate limiter uses a set of "time buckets". A time bucket, i, is filled to a maximum of
// `BucketSizes[i]` at a rate of 1, and emptied by an amount equal to `(size of request)/RateParam` each time a
Expand Down
25 changes: 23 additions & 2 deletions common/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package ratelimit

import (
"context"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type BucketStore = common.KVStore[common.RateBucketParams]
Expand All @@ -15,13 +18,20 @@ type rateLimiter struct {
bucketStore BucketStore

logger logging.Logger

// Prometheus metrics
bucketLevels *prometheus.GaugeVec
}

func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter {
func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter {
return &rateLimiter{
globalRateParams: rateParams,
bucketStore: bucketStore,
logger: logger.With("component", "RateLimiter"),
bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rate_limiter_bucket_levels",
Help: "Current level of each bucket for rate limiting",
}, []string{"requester_id", "requester_name", "bucket_index"}),
}
}

Expand Down Expand Up @@ -109,7 +119,18 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar
bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction)
allowed = allowed && bucketParams.BucketLevels[i] > 0

d.logger.Debug("Bucket level", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed)
d.logger.Debug("Bucket level updated", "key", params.RequesterID, "name", params.RequesterName, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed)

// Update metrics only if the requester name is provided. We're making
// an assumption that the requester name is only provided for authenticated
// requests so it should limit the cardinality of the requester_id label.
if params.RequesterName != "" {
d.bucketLevels.With(prometheus.Labels{
"requester_id": params.RequesterID,
"requester_name": params.RequesterName,
"bucket_index": strconv.Itoa(i),
}).Set(float64(bucketParams.BucketLevels[i]))
}
}

return allowed, bucketParams
Expand Down
3 changes: 2 additions & 1 deletion common/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,7 +26,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) {
return nil, err
}

ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logging.NewNoopLogger())
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logging.NewNoopLogger())

return ratelimiter, nil

Expand Down
36 changes: 23 additions & 13 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,14 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu
rates.BlobRate = rateInfo.BlobRate
}

if len(rateInfo.Name) > 0 {
rates.Name = rateInfo.Name
}

break
}

return rates, key, nil

}

// Enum of rateTypes for the limiterInfo struct
Expand Down Expand Up @@ -446,6 +449,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
s.metrics.HandleInternalFailureRpcRequest(apiMethodName)
return api.NewInternalError(err.Error())
}

// Note: There's an implicit assumption that an empty name means the account
// is not in the allow list.
requesterName = accountRates.Name

// Update the quorum rate
Expand All @@ -458,9 +464,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
// System Level
key := fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemThroughputType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: encodedSize,
Rate: globalRates.TotalUnauthThroughput,
RequesterID: key,
RequesterName: systemAccountKey,
BlobSize: encodedSize,
Rate: globalRates.TotalUnauthThroughput,
Info: limiterInfo{
RateType: SystemThroughputType,
QuorumID: param.QuorumID,
Expand All @@ -469,9 +476,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context

key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: blobRateMultiplier,
Rate: globalRates.TotalUnauthBlobRate,
RequesterID: key,
RequesterName: systemAccountKey,
BlobSize: blobRateMultiplier,
Rate: globalRates.TotalUnauthBlobRate,
Info: limiterInfo{
RateType: SystemBlobRateType,
QuorumID: param.QuorumID,
Expand All @@ -481,9 +489,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
// Account Level
key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountThroughputType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: encodedSize,
Rate: accountRates.Throughput,
RequesterID: key,
RequesterName: requesterName,
BlobSize: encodedSize,
Rate: accountRates.Throughput,
Info: limiterInfo{
RateType: AccountThroughputType,
QuorumID: param.QuorumID,
Expand All @@ -492,9 +501,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context

key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: blobRateMultiplier,
Rate: accountRates.BlobRate,
RequesterID: key,
RequesterName: requesterName,
BlobSize: blobRateMultiplier,
Rate: accountRates.BlobRate,
Info: limiterInfo{
RateType: AccountBlobRateType,
QuorumID: param.QuorumID,
Expand Down
9 changes: 7 additions & 2 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"

pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand Down Expand Up @@ -641,7 +642,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create bucket store")
}
ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger)

rateConfig := apiserver.RateConfig{
QuorumRateInfos: map[core.QuorumID]apiserver.QuorumRateInfo{
Expand All @@ -662,20 +663,24 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
Allowlist: apiserver.Allowlist{
"1.2.3.4": map[uint8]apiserver.PerUserRateInfo{
0: {
Name: "eigenlabs",
Throughput: 100 * 1024,
BlobRate: 5 * 1e6,
},
1: {
Name: "eigenlabs",
Throughput: 1024 * 1024,
BlobRate: 5 * 1e6,
},
},
"0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{
0: {
Name: "eigenlabs",
Throughput: 100 * 1024,
BlobRate: 5 * 1e6,
},
1: {
Name: "eigenlabs",
Throughput: 1024 * 1024,
BlobRate: 5 * 1e6,
},
Expand All @@ -693,7 +698,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
return apiserver.NewDispersalServer(disperser.ServerConfig{
GrpcPort: "51001",
GrpcTimeout: 1 * time.Second,
}, queue, transactor, logger, disperser.NewMetrics("9001", logger), ratelimiter, rateConfig)
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig)
}

func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) {
Expand Down
18 changes: 14 additions & 4 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser/apiserver"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/common/aws/s3"
Expand Down Expand Up @@ -91,6 +92,8 @@ func RunDisperserServer(ctx *cli.Context) error {
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

reg := prometheus.NewRegistry()

var ratelimiter common.RateLimiter
if config.EnableRatelimiter {
globalParams := config.RatelimiterConfig.GlobalRateParams
Expand All @@ -108,12 +111,19 @@ func RunDisperserServer(ctx *cli.Context) error {
return err
}
}
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger)
}

// TODO: create a separate metrics for batcher
metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, transactor, logger, metrics, ratelimiter, config.RateConfig)
metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger)
server := apiserver.NewDispersalServer(
config.ServerConfig,
blobStore,
transactor,
logger,
metrics,
ratelimiter,
config.RateConfig,
)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand Down
3 changes: 1 addition & 2 deletions disperser/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ const (
AccountRateLimitedFailure string = "ratelimited-account" // The request rate limited at account level
)

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics {
namespace := "eigenda_disperser"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

Expand Down
33 changes: 18 additions & 15 deletions node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common/pubip"
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/prometheus/client_golang/prometheus"

"github.com/urfave/cli"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigenda/node/flags"
"github.com/Layr-Labs/eigenda/node/grpc"
Expand Down Expand Up @@ -56,18 +57,8 @@ func NodeMain(ctx *cli.Context) error {

pubIPProvider := pubip.ProviderOrDefault(config.PubIPProvider)

// Create the node.
node, err := node.NewNode(config, pubIPProvider, logger)
if err != nil {
return err
}

err = node.Start(context.Background())
if err != nil {
node.Logger.Error("could not start node", "error", err)
return err
}

// Rate limiter
reg := prometheus.NewRegistry()
globalParams := common.GlobalRateParams{
BucketSizes: []time.Duration{bucketDuration},
Multipliers: []float32{bucketMultiplier},
Expand All @@ -79,7 +70,19 @@ func NodeMain(ctx *cli.Context) error {
return err
}

ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter := ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger)

// Create the node.
node, err := node.NewNode(reg, config, pubIPProvider, logger)
if err != nil {
return err
}

err = node.Start(context.Background())
if err != nil {
node.Logger.Error("could not start node", "error", err)
return err
}

// Creates the GRPC server.
server := grpc.NewServer(config, node, logger, ratelimiter)
Expand Down
9 changes: 4 additions & 5 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,15 @@ type Node struct {
}

// NewNode creates a new Node with the provided config.
func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
// Setup metrics
// sdkClients, err := buildSdkClients(config, logger)
// if err != nil {
// return nil, err
// }

promReg := prometheus.NewRegistry()
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics"))
rpcCallsCollector := rpccalls.NewCollector(AppName, promReg)
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics"))
rpcCallsCollector := rpccalls.NewCollector(AppName, reg)

// Generate BLS keys
keyPair, err := core.MakeKeyPairFromString(config.PrivateBls)
Expand Down Expand Up @@ -114,7 +113,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger
// Setup Node Api
nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)
metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)

// Make validator
v, err := verifier.NewVerifier(&config.EncoderConfig, false)
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
}
finalizer := batchermock.NewFinalizer()

disperserMetrics := disperser.NewMetrics("9100", logger)
disperserMetrics := disperser.NewMetrics(prometheus.NewRegistry(), "9100", logger)
txnManager := batchermock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)
Expand Down

0 comments on commit b6dc720

Please sign in to comment.