diff --git a/x/oracle/keeper/aggregator/context.go b/x/oracle/keeper/aggregator/context.go index e8ad8d866..942b793da 100644 --- a/x/oracle/keeper/aggregator/context.go +++ b/x/oracle/keeper/aggregator/context.go @@ -218,7 +218,8 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context, force bool) (success [] return success, failed } -func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) { +// TODO: test to remove PrepareRound into BeginBlock +func (agc *AggregatorContext) PrepareRoundBeginBlock(ctx sdk.Context, block uint64) { // block>0 means recache initialization, all roundInfo is empty if block == 0 { block = uint64(ctx.BlockHeight()) @@ -228,15 +229,16 @@ func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) { if feederID == 0 { continue } - if (feeder.EndBlock > 0 && feeder.EndBlock <= block) || feeder.StartBaseBlock > block { + if (feeder.EndBlock > 0 && feeder.EndBlock < block) || feeder.StartBaseBlock >= block { // this feeder is inactive continue } - delta := block - feeder.StartBaseBlock + baseBlock := block - 1 + delta := baseBlock - feeder.StartBaseBlock left := delta % feeder.Interval count := delta / feeder.Interval - latestBasedblock := block - left + latestBasedblock := baseBlock - left latestNextRoundID := feeder.StartRoundID + count feederIDUint64 := uint64(feederID) diff --git a/x/oracle/keeper/aggregator/context_test.go b/x/oracle/keeper/aggregator/context_test.go index 65f839753..b00c036a2 100644 --- a/x/oracle/keeper/aggregator/context_test.go +++ b/x/oracle/keeper/aggregator/context_test.go @@ -18,7 +18,7 @@ func TestAggregatorContext(t *testing.T) { Convey("prepare round to gengerate round info of feeders for next block", func() { Convey("pepare within the window", func() { p := patchBlockHeight(12) - agc.PrepareRound(ctx, 0) + agc.PrepareRoundBeginBlock(ctx, 0) Convey("for empty round list", func() { So(*agc.rounds[1], ShouldResemble, roundInfo{10, 2, 1}) @@ -27,9 +27,9 @@ func TestAggregatorContext(t *testing.T) { Convey("update already exist round info", func() { p.Reset() time.Sleep(1 * time.Second) - patchBlockHeight(10 + int64(common.MaxNonce)) + patchBlockHeight(10 + int64(common.MaxNonce) + 1) - agc.PrepareRound(ctx, 0) + agc.PrepareRoundBeginBlock(ctx, 0) So(agc.rounds[1].status, ShouldEqual, 2) }) p.Reset() @@ -37,8 +37,8 @@ func TestAggregatorContext(t *testing.T) { }) Convey("pepare outside the window", func() { Convey("for empty round list", func() { - p := patchBlockHeight(10 + int64(common.MaxNonce)) - agc.PrepareRound(ctx, 0) + p := patchBlockHeight(10 + int64(common.MaxNonce) + 1) + agc.PrepareRoundBeginBlock(ctx, 0) So(agc.rounds[1].status, ShouldEqual, 2) p.Reset() time.Sleep(1 * time.Second) @@ -46,9 +46,9 @@ func TestAggregatorContext(t *testing.T) { }) }) - Convey("seal existed round without any msg recieved", func() { + Convey("seal existing round without any msg recieved", func() { p := patchBlockHeight(11) - agc.PrepareRound(ctx, 0) + agc.PrepareRoundBeginBlock(ctx, 0) Convey("seal when exceed the window", func() { So(agc.rounds[1].status, ShouldEqual, 1) p.Reset() diff --git a/x/oracle/keeper/msg_server_create_price.go b/x/oracle/keeper/msg_server_create_price.go index ae165e59d..8f67024a3 100644 --- a/x/oracle/keeper/msg_server_create_price.go +++ b/x/oracle/keeper/msg_server_create_price.go @@ -18,15 +18,17 @@ func (ms msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice logger := ms.Keeper.Logger(ctx) if err := checkTimestamp(ctx, msg); err != nil { + logger.Info("price proposal timestamp check failed", "error", err, "height", ctx.BlockHeight()) return nil, types.ErrPriceProposalFormatInvalid.Wrap(err.Error()) } newItem, caches, err := GetAggregatorContext(ctx, ms.Keeper).NewCreatePrice(ctx, msg) if err != nil { + logger.Info("price proposal failed", "error", err, "height", ctx.BlockHeight()) return nil, err } - logger.Info("add price proposal for aggregation", "feederID", msg.FeederID, "basedBlock", msg.BasedBlock, "proposer", msg.Creator) + logger.Info("add price proposal for aggregation", "feederID", msg.FeederID, "basedBlock", msg.BasedBlock, "proposer", msg.Creator, "height", ctx.BlockHeight()) ctx.EventManager().EmitEvent(sdk.NewEvent( types.EventTypeCreatePrice, @@ -42,7 +44,7 @@ func (ms msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice if newItem != nil { ms.AppendPriceTR(ctx, newItem.TokenID, newItem.PriceTR) - logger.Info("final price aggregation done", "feederID", msg.FeederID, "roundID", newItem.PriceTR.RoundID, "price", newItem.PriceTR.Price) + logger.Info("final price aggregation done", "feederID", msg.FeederID, "roundID", newItem.PriceTR.RoundID, "price", newItem.PriceTR.Price, "height", ctx.BlockHeight()) ctx.EventManager().EmitEvent(sdk.NewEvent( types.EventTypeCreatePrice, diff --git a/x/oracle/keeper/single.go b/x/oracle/keeper/single.go index dccec0100..27af077c9 100644 --- a/x/oracle/keeper/single.go +++ b/x/oracle/keeper/single.go @@ -60,20 +60,23 @@ func GetAggregatorContext(ctx sdk.Context, k Keeper) *aggregator.AggregatorConte } func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k Keeper, c *cache.Cache) bool { - from := ctx.BlockHeight() - int64(common.MaxNonce) - to := ctx.BlockHeight() - 1 + logger := k.Logger(ctx) + from := ctx.BlockHeight() - int64(common.MaxNonce) + 1 + to := ctx.BlockHeight() h, ok := k.GetValidatorUpdateBlock(ctx) recentParamsMap := k.GetAllRecentParamsAsMap(ctx) if !ok || len(recentParamsMap) == 0 { + logger.Info("no validatorUpdateBlock found, go to initial process", "height", ctx.BlockHeight()) // no cache, this is the very first running, so go to initial process instead return false } - if int64(h.Block) > from { - from = int64(h.Block) + if int64(h.Block) >= from { + from = int64(h.Block) + 1 } + logger.Info("recacheAggregatorContext", "from", from, "to", to, "height", ctx.BlockHeight()) totalPower := big.NewInt(0) validatorPowers := make(map[string]*big.Int) validatorSet := k.GetAllExocoreValidators(ctx) @@ -91,35 +94,8 @@ func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext c.AddCache(cache.ItemV(validatorPowers)) recentMsgs := k.GetAllRecentMsgAsMap(ctx) - var pTmp common.Params - for ; from < to; from++ { - // fill params - prev := int64(0) - for b, recentParams := range recentParamsMap { - if b <= from && b > prev { - pTmp = common.Params(*recentParams) - agc.SetParams(&pTmp) - prev = b - setCommonParams(*recentParams) - } - } - - agc.PrepareRound(ctx, uint64(from)) - - if msgs := recentMsgs[from+1]; msgs != nil { - for _, msg := range msgs { - // these messages are retreived for recache, just skip the validation check and fill the memory cache - //nolint - agc.FillPrice(&types.MsgCreatePrice{ - Creator: msg.Validator, - FeederID: msg.FeederID, - Prices: msg.PSources, - }) - } - } - agc.SealRound(ctx, false) - } + var pTmp common.Params if from >= to { // backwards compatible for that the validatorUpdateBlock updated every block prev := int64(0) @@ -132,19 +108,47 @@ func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext } agc.SetParams(&pTmp) setCommonParams(types.Params(pTmp)) + } else { + for ; from < to; from++ { + // fill params + prev := int64(0) + for b, recentParams := range recentParamsMap { + if b <= from && b > prev { + pTmp = common.Params(*recentParams) + agc.SetParams(&pTmp) + prev = b + setCommonParams(*recentParams) + } + } + + agc.PrepareRoundBeginBlock(ctx, uint64(from)) + + if msgs := recentMsgs[from]; msgs != nil { + for _, msg := range msgs { + // these messages are retreived for recache, just skip the validation check and fill the memory cache + //nolint + agc.FillPrice(&types.MsgCreatePrice{ + Creator: msg.Validator, + FeederID: msg.FeederID, + Prices: msg.PSources, + }) + } + } + ctxReplay := ctx.WithBlockHeight(from) + agc.SealRound(ctxReplay, false) + } } var pRet common.Params if updated := c.GetCache(cache.ItemP(&pRet)); !updated { c.AddCache(cache.ItemP(&pTmp)) } - // fill params cache - agc.PrepareRound(ctx, uint64(to)) return true } func initAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle, c *cache.Cache) { + ctx.Logger().Info("initAggregatorContext", "height", ctx.BlockHeight()) // set params p := k.GetParams(ctx) pTmp := common.Params(p) @@ -168,7 +172,7 @@ func initAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k // set validatorPower cache c.AddCache(cache.ItemV(validatorPowers)) - agc.PrepareRound(ctx, uint64(ctx.BlockHeight()-1)) + agc.PrepareRoundBeginBlock(ctx, uint64(ctx.BlockHeight())) } func ResetAggregatorContext() { diff --git a/x/oracle/module.go b/x/oracle/module.go index b6c1b83d8..3c499f236 100644 --- a/x/oracle/module.go +++ b/x/oracle/module.go @@ -150,7 +150,15 @@ func (am AppModule) ExportGenesis(ctx sdk.Context, cdc codec.JSONCodec) json.Raw func (AppModule) ConsensusVersion() uint64 { return 1 } // BeginBlock contains the logic that is automatically triggered at the beginning of each block -func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {} +func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) { + _ = keeper.GetCaches() + agc := keeper.GetAggregatorContext(ctx, am.keeper) + + logger := am.keeper.Logger(ctx) + + logger.Info("prepare for next oracle round of each tokenFeeder", "height", ctx.BlockHeight()) + agc.PrepareRoundBeginBlock(ctx, 0) +} // EndBlock contains the logic that is automatically triggered at the end of each block func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate { @@ -175,7 +183,7 @@ func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.Val agc.SetValidatorPowers(validatorPowers) // TODO: seal all alive round since validatorSet changed here forceSeal = true - logger.Info("validator set changed, force seal all active rounds") + logger.Info("validator set changed, force seal all active rounds", "height", ctx.BlockHeight()) } // TODO: for v1 use mode==1, just check the failed feeders @@ -191,7 +199,6 @@ func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.Val if pTR, ok := am.keeper.GetPriceTRLatest(ctx, tokenID); ok { pTR.RoundID++ am.keeper.AppendPriceTR(ctx, tokenID, pTR) - logger.Info("add new round with previous price under fail aggregation", "tokenID", tokenID, "roundID", pTR.RoundID) logInfo += fmt.Sprintf(", roundID:%d, price:%s", pTR.RoundID, pTR.Price) event.AppendAttributes( sdk.NewAttribute(types.AttributeKeyRoundID, strconv.FormatUint(pTR.RoundID, 10)), @@ -208,13 +215,11 @@ func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.Val sdk.NewAttribute(types.AttributeKeyFinalPrice, "-"), ) } - logger.Info(logInfo) + logger.Info(logInfo, "height", ctx.BlockHeight()) ctx.EventManager().EmitEvent(event) } // TODO: emit events for success sealed rounds(could ignore for v1) - logger.Info("prepare for next oracle round of each tokenFeeder") - agc.PrepareRound(ctx, 0) keeper.ResetAggregatorContextCheckTx() // TODO: update params happened during this block for cache, for the case: agc is recached from history and cache'params is set with the latest params by recache, but parmas changed during this block as well. or force agc to be GET first before cache be GET(later approch is better)