Skip to content

Commit

Permalink
Fix #1457; Fix #1484;
Browse files Browse the repository at this point in the history
Fix a high memory consumption that also is part of the issue #1457.
Under high load of requests (1000/rps or more) the RAM got crazy and scale up to 40GB or close to that.
Now after the fix of #1457 with the worker pool, the node remains under 14gb of ram in my local tests.
  • Loading branch information
jorgecuesta authored and oten91 committed Nov 28, 2022
1 parent c4c259f commit fa8fca5
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 21 deletions.
2 changes: 2 additions & 0 deletions app/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func NewInMemoryTendermintNodeAminoWithValidators(t *testing.T, genesisState []b
panic(err)
}
pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()
PCA = nil
inMemKB = nil
err := inMemDB.Close()
Expand Down Expand Up @@ -168,6 +169,7 @@ func NewInMemoryTendermintNodeProtoWithValidators(t *testing.T, genesisState []b
}

pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()

PCA = nil
inMemKB = nil
Expand Down
1 change: 1 addition & 0 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ func InitPocketCoreConfig(chains *types.HostedBlockchains, logger log.Logger) {
}

func ShutdownPocketCore() {
types.StopEvidenceWorker()
types.FlushSessionCache()
types.StopServiceMetrics()
}
Expand Down
44 changes: 28 additions & 16 deletions app/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ func TestQueryRelay(t *testing.T) {
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
select {
case <-evtChan:
assert.Equal(t, uint64(1), types.GlobalEvidenceWorker.SuccessfulTasks())
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
Expand Down Expand Up @@ -858,16 +859,18 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
}
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
<-evtChan // Wait for block
chain := sdk.PlaceholderHash
sessionBlockHeight := int64(1)
// setup relay
for _, v := range validators {
relay := types.Relay{
Payload: payload,
Meta: types.RelayMeta{BlockHeight: 5}, // todo race condition here
Proof: types.RelayProof{
Entropy: 32598345349034509,
SessionBlockHeight: 1,
SessionBlockHeight: sessionBlockHeight,
ServicerPubKey: v.PublicKey().RawString(),
Blockchain: sdk.PlaceholderHash,
Blockchain: chain,
Token: aat,
Signature: "",
},
Expand All @@ -886,23 +889,32 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
BodyString(expectedRequest).
Reply(200).
BodyString(expectedResponse)
}

validatorAddress := sdk.GetAddress(v.PublicKey())
node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)
select {
case <-evtChan:
// verify that each store task was successful
assert.Equal(t, uint64(len(validators)), types.GlobalEvidenceWorker.SuccessfulTasks())
// check the evidence store of each node.
for _, v := range validators {
validatorAddress := sdk.GetAddress(v.PublicKey())
_node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: chain,
SessionBlockHeight: sessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), _node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, int64(1), inv.NumOfProofs)
}

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
SessionBlockHeight: relay.Proof.SessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, inv.NumOfProofs, int64(1))
cleanup()
stopCli()
gock.Off()
}
cleanup()
stopCli()
gock.Off()
return
})
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pokt-network/pocket-core
go 1.18

require (
github.com/alitto/pond v1.8.1
github.com/go-kit/kit v0.12.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alitto/pond v1.8.1 h1:GzrU4ZERX0JDNMmAY2k5y1Wqgmul77nt3bsDgvwVgO4=
github.com/alitto/pond v1.8.1/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
9 changes: 9 additions & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type PocketConfig struct {
ValidatorCacheSize int64 `json:"validator_cache_size"`
ApplicationCacheSize int64 `json:"application_cache_size"`
RPCTimeout int64 `json:"rpc_timeout"`
RPCMaxIdleConns int `json:"rpc_max_idle_conns"`
RPCMaxConnsPerHost int `json:"rpc_max_conns_per_host"`
RPCMaxIdleConnsPerHost int `json:"rpc_max_idle_conns_per_host"`
PrometheusAddr string `json:"pocket_prometheus_port"`
PrometheusMaxOpenfiles int `json:"prometheus_max_open_files"`
MaxClaimAgeForProofRetry int `json:"max_claim_age_for_proof_retry"`
Expand Down Expand Up @@ -100,6 +103,9 @@ const (
DefaultPocketPrometheusListenAddr = "8083"
DefaultPrometheusMaxOpenFile = 3
DefaultRPCTimeout = 30000
DefaultRPCMaxIdleConns = 1000
DefaultRPCMaxConnsPerHost = 1000
DefaultRPCMaxIdleConnsPerHost = 1000
DefaultMaxClaimProofRetryAge = 32
DefaultProofPrevalidation = false
DefaultCtxCacheSize = 20
Expand Down Expand Up @@ -133,6 +139,9 @@ func DefaultConfig(dataDir string) Config {
ValidatorCacheSize: DefaultValidatorCacheSize,
ApplicationCacheSize: DefaultApplicationCacheSize,
RPCTimeout: DefaultRPCTimeout,
RPCMaxIdleConns: DefaultRPCMaxIdleConns,
RPCMaxConnsPerHost: DefaultRPCMaxConnsPerHost,
RPCMaxIdleConnsPerHost: DefaultRPCMaxIdleConnsPerHost,
PrometheusAddr: DefaultPocketPrometheusListenAddr,
PrometheusMaxOpenfiles: DefaultPrometheusMaxOpenFile,
MaxClaimAgeForProofRetry: DefaultMaxClaimProofRetryAge,
Expand Down
8 changes: 6 additions & 2 deletions x/pocketcore/keeper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ func (k Keeper) HandleRelay(ctx sdk.Ctx, relay pc.Relay) (*pc.RelayResponse, sdk
}
return nil, err
}
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
// move this to a worker that will insert this proof in a series style to avoid memory consumption and relay proof race conditions
// https://github.com/pokt-network/pocket-core/issues/1457
pc.GlobalEvidenceWorker.Submit(func() {
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
})
// attempt to execute
respPayload, err := relay.Execute(hostedBlockchains, &nodeAddress)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion x/pocketcore/keeper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func TestKeeper_HandleRelay(t *testing.T) {
t.Fatalf(er.Error())
}
validRelay.Proof.Signature = hex.EncodeToString(clientSig)
httpClient := types.GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution

defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://www.google.com:443").
Post("/").
Reply(200).
Expand Down
23 changes: 23 additions & 0 deletions x/pocketcore/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/hex"
"fmt"
"github.com/alitto/pond"
"github.com/pokt-network/pocket-core/crypto"
"github.com/pokt-network/pocket-core/types"
"github.com/tendermint/tendermint/config"
Expand All @@ -20,13 +21,16 @@ var (
globalRPCTimeout time.Duration
GlobalPocketConfig types.PocketConfig
GlobalTenderMintConfig config.Config
GlobalEvidenceWorker *pond.WorkerPool
)

func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
ConfigOnce.Do(func() {
InitGlobalServiceMetric(chains, logger, c.PocketConfig.PrometheusAddr, c.PocketConfig.PrometheusMaxOpenfiles)
})
InitHttpClient(c.PocketConfig.RPCMaxIdleConns, c.PocketConfig.RPCMaxConnsPerHost, c.PocketConfig.RPCMaxIdleConnsPerHost)
InitPocketNodeCaches(c, logger)
InitEvidenceWorker(c, logger)
GlobalPocketConfig = c.PocketConfig
GlobalTenderMintConfig = c.TendermintConfig
if GlobalPocketConfig.LeanPocket {
Expand All @@ -37,6 +41,18 @@ func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
SetRPCTimeout(c.PocketConfig.RPCTimeout)
}

func InitEvidenceWorker(_ types.Config, logger log.Logger) {
panicHandler := func(p interface{}) {
logger.Error(fmt.Sprintf("evidence storage task panicked: %v", p))
}
GlobalEvidenceWorker = pond.New(
1, 0,
pond.IdleTimeout(100),
pond.PanicHandler(panicHandler),
pond.Strategy(pond.Balanced()),
)
}

func ConvertEvidenceToProto(config types.Config) error {
// we have to add a random pocket node so that way lean pokt can still support getting the global evidence cache
node := AddPocketNode(crypto.GenerateEd25519PrivKey().GenPrivateKey(), log.NewNopLogger())
Expand Down Expand Up @@ -67,6 +83,13 @@ func ConvertEvidenceToProto(config types.Config) error {
return nil
}

func StopEvidenceWorker() {
if !GlobalEvidenceWorker.Stopped() {
GlobalEvidenceWorker.StopAndWait()
}
GlobalEvidenceWorker = nil
}

func FlushSessionCache() {
for _, k := range GlobalPocketNodes {
if k.SessionStore != nil {
Expand Down
36 changes: 35 additions & 1 deletion x/pocketcore/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,47 @@ import (

const DEFAULTHTTPMETHOD = "POST"

var (
chainHttpClient *http.Client
)

// "Relay" - A read / write API request from a hosted (non native) external blockchain
type Relay struct {
Payload Payload `json:"payload"` // the data payload of the request
Meta RelayMeta `json:"meta"` // metadata for the relay request
Proof RelayProof `json:"proof"` // the authentication scheme needed for work
}

func GetChainsClient() *http.Client {
if chainHttpClient == nil {
InitHttpClient(1000, 1000, 1000)
}

return chainHttpClient
}

func InitHttpClient(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int) {
var chainsTransport *http.Transport

t, ok := http.DefaultTransport.(*http.Transport)
if ok {
chainsTransport = t.Clone()
// this params may could be handled by config.json, but how much people know about this?
// tbd: figure out the right values to this, rn the priority is stop recreating new http client and connections.
chainsTransport.MaxIdleConns = maxIdleConns
chainsTransport.MaxConnsPerHost = maxConnsPerHost
chainsTransport.MaxIdleConnsPerHost = maxIdleConnsPerHost
} // if not ok, probably is because is *gock.Transport - test only

chainHttpClient = &http.Client{
Timeout: globalRPCTimeout * time.Second,
}

if chainsTransport != nil {
chainHttpClient.Transport = chainsTransport
}
}

// "Validate" - Checks the validity of a relay request using store data
func (r *Relay) Validate(ctx sdk.Ctx, posKeeper PosKeeper, appsKeeper AppsKeeper, pocketKeeper PocketKeeper, hb *HostedBlockchains, sessionBlockHeight int64, node *PocketNode) (maxPossibleRelays sdk.BigInt, err sdk.Error) {
// validate payload
Expand Down Expand Up @@ -316,7 +350,7 @@ func executeHTTPRequest(payload, url, userAgent string, basicAuth BasicAuth, met
}
}
// execute the request
resp, err := (&http.Client{Timeout: globalRPCTimeout * time.Millisecond}).Do(req)
resp, err := GetChainsClient().Do(req)
if err != nil {
return "", err
}
Expand Down
5 changes: 4 additions & 1 deletion x/pocketcore/types/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ func TestRelay_Execute(t *testing.T) {
},
}
validRelay.Proof.RequestHash = validRelay.RequestHashString()
httpClient := GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution

defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://server.com").
Post("/relay").
Reply(200).
Expand All @@ -185,6 +187,7 @@ func TestRelay_Execute(t *testing.T) {
URL: "https://server.com/relay/",
}},
}

response, err := validRelay.Execute(&hb, &nodeAddr)
assert.True(t, err == nil)
assert.Equal(t, response, "bar")
Expand Down

0 comments on commit fa8fca5

Please sign in to comment.