Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(oralce):separate prepareRound from endBlock into beginBlock #3

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions x/oracle/keeper/aggregator/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context, force bool) (success []
return success, failed
}

func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) {
// TODO: test to remove PrepareRound into BeginBlock
func (agc *AggregatorContext) PrepareRoundBeginBlock(ctx sdk.Context, block uint64) {
// block>0 means recache initialization, all roundInfo is empty
if block == 0 {
block = uint64(ctx.BlockHeight())
Expand All @@ -228,15 +229,16 @@ func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) {
if feederID == 0 {
continue
}
if (feeder.EndBlock > 0 && feeder.EndBlock <= block) || feeder.StartBaseBlock > block {
if (feeder.EndBlock > 0 && feeder.EndBlock < block) || feeder.StartBaseBlock >= block {
// this feeder is inactive
continue
}

delta := block - feeder.StartBaseBlock
baseBlock := block - 1
delta := baseBlock - feeder.StartBaseBlock
left := delta % feeder.Interval
count := delta / feeder.Interval
latestBasedblock := block - left
latestBasedblock := baseBlock - left
latestNextRoundID := feeder.StartRoundID + count

feederIDUint64 := uint64(feederID)
Expand Down
14 changes: 7 additions & 7 deletions x/oracle/keeper/aggregator/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAggregatorContext(t *testing.T) {
Convey("prepare round to gengerate round info of feeders for next block", func() {
Convey("pepare within the window", func() {
p := patchBlockHeight(12)
agc.PrepareRound(ctx, 0)
agc.PrepareRoundBeginBlock(ctx, 0)

Convey("for empty round list", func() {
So(*agc.rounds[1], ShouldResemble, roundInfo{10, 2, 1})
Expand All @@ -27,28 +27,28 @@ func TestAggregatorContext(t *testing.T) {
Convey("update already exist round info", func() {
p.Reset()
time.Sleep(1 * time.Second)
patchBlockHeight(10 + int64(common.MaxNonce))
patchBlockHeight(10 + int64(common.MaxNonce) + 1)

agc.PrepareRound(ctx, 0)
agc.PrepareRoundBeginBlock(ctx, 0)
So(agc.rounds[1].status, ShouldEqual, 2)
})
p.Reset()
time.Sleep(1 * time.Second)
})
Convey("pepare outside the window", func() {
Convey("for empty round list", func() {
p := patchBlockHeight(10 + int64(common.MaxNonce))
agc.PrepareRound(ctx, 0)
p := patchBlockHeight(10 + int64(common.MaxNonce) + 1)
agc.PrepareRoundBeginBlock(ctx, 0)
So(agc.rounds[1].status, ShouldEqual, 2)
p.Reset()
time.Sleep(1 * time.Second)
})
})
})

Convey("seal existed round without any msg recieved", func() {
Convey("seal existing round without any msg recieved", func() {
p := patchBlockHeight(11)
agc.PrepareRound(ctx, 0)
agc.PrepareRoundBeginBlock(ctx, 0)
Convey("seal when exceed the window", func() {
So(agc.rounds[1].status, ShouldEqual, 1)
p.Reset()
Expand Down
6 changes: 4 additions & 2 deletions x/oracle/keeper/msg_server_create_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ func (ms msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice

logger := ms.Keeper.Logger(ctx)
if err := checkTimestamp(ctx, msg); err != nil {
logger.Info("price proposal timestamp check failed", "error", err, "height", ctx.BlockHeight())
return nil, types.ErrPriceProposalFormatInvalid.Wrap(err.Error())
}

newItem, caches, err := GetAggregatorContext(ctx, ms.Keeper).NewCreatePrice(ctx, msg)
if err != nil {
logger.Info("price proposal failed", "error", err, "height", ctx.BlockHeight())
return nil, err
}

logger.Info("add price proposal for aggregation", "feederID", msg.FeederID, "basedBlock", msg.BasedBlock, "proposer", msg.Creator)
logger.Info("add price proposal for aggregation", "feederID", msg.FeederID, "basedBlock", msg.BasedBlock, "proposer", msg.Creator, "height", ctx.BlockHeight())

ctx.EventManager().EmitEvent(sdk.NewEvent(
types.EventTypeCreatePrice,
Expand All @@ -42,7 +44,7 @@ func (ms msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice
if newItem != nil {
ms.AppendPriceTR(ctx, newItem.TokenID, newItem.PriceTR)

logger.Info("final price aggregation done", "feederID", msg.FeederID, "roundID", newItem.PriceTR.RoundID, "price", newItem.PriceTR.Price)
logger.Info("final price aggregation done", "feederID", msg.FeederID, "roundID", newItem.PriceTR.RoundID, "price", newItem.PriceTR.Price, "height", ctx.BlockHeight())

ctx.EventManager().EmitEvent(sdk.NewEvent(
types.EventTypeCreatePrice,
Expand Down
74 changes: 39 additions & 35 deletions x/oracle/keeper/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,23 @@
}

func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k Keeper, c *cache.Cache) bool {
from := ctx.BlockHeight() - int64(common.MaxNonce)
to := ctx.BlockHeight() - 1
logger := k.Logger(ctx)
from := ctx.BlockHeight() - int64(common.MaxNonce) + 1
to := ctx.BlockHeight()

h, ok := k.GetValidatorUpdateBlock(ctx)
recentParamsMap := k.GetAllRecentParamsAsMap(ctx)
if !ok || len(recentParamsMap) == 0 {
logger.Info("no validatorUpdateBlock found, go to initial process", "height", ctx.BlockHeight())
// no cache, this is the very first running, so go to initial process instead
return false
}

if int64(h.Block) > from {
from = int64(h.Block)
if int64(h.Block) >= from {
from = int64(h.Block) + 1
}

logger.Info("recacheAggregatorContext", "from", from, "to", to, "height", ctx.BlockHeight())
totalPower := big.NewInt(0)
validatorPowers := make(map[string]*big.Int)
validatorSet := k.GetAllExocoreValidators(ctx)
Expand All @@ -91,35 +94,8 @@
c.AddCache(cache.ItemV(validatorPowers))

recentMsgs := k.GetAllRecentMsgAsMap(ctx)
var pTmp common.Params
for ; from < to; from++ {
// fill params
prev := int64(0)
for b, recentParams := range recentParamsMap {
if b <= from && b > prev {
pTmp = common.Params(*recentParams)
agc.SetParams(&pTmp)
prev = b
setCommonParams(*recentParams)
}
}

agc.PrepareRound(ctx, uint64(from))

if msgs := recentMsgs[from+1]; msgs != nil {
for _, msg := range msgs {
// these messages are retreived for recache, just skip the validation check and fill the memory cache
//nolint
agc.FillPrice(&types.MsgCreatePrice{
Creator: msg.Validator,
FeederID: msg.FeederID,
Prices: msg.PSources,
})
}
}
agc.SealRound(ctx, false)
}

var pTmp common.Params
if from >= to {
// backwards compatible for that the validatorUpdateBlock updated every block
prev := int64(0)
Expand All @@ -132,19 +108,47 @@
}
agc.SetParams(&pTmp)
setCommonParams(types.Params(pTmp))
} else {
for ; from < to; from++ {
// fill params
prev := int64(0)
for b, recentParams := range recentParamsMap {
if b <= from && b > prev {
pTmp = common.Params(*recentParams)
agc.SetParams(&pTmp)
prev = b
setCommonParams(*recentParams)
}
}
Comment on lines +115 to +122

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism

agc.PrepareRoundBeginBlock(ctx, uint64(from))

if msgs := recentMsgs[from]; msgs != nil {
for _, msg := range msgs {
// these messages are retreived for recache, just skip the validation check and fill the memory cache
//nolint
agc.FillPrice(&types.MsgCreatePrice{
Creator: msg.Validator,
FeederID: msg.FeederID,
Prices: msg.PSources,
})
}
}
ctxReplay := ctx.WithBlockHeight(from)
agc.SealRound(ctxReplay, false)
}
}

var pRet common.Params
if updated := c.GetCache(cache.ItemP(&pRet)); !updated {
c.AddCache(cache.ItemP(&pTmp))
}
// fill params cache
agc.PrepareRound(ctx, uint64(to))

return true
}

func initAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle, c *cache.Cache) {
ctx.Logger().Info("initAggregatorContext", "height", ctx.BlockHeight())
// set params
p := k.GetParams(ctx)
pTmp := common.Params(p)
Expand All @@ -168,7 +172,7 @@
// set validatorPower cache
c.AddCache(cache.ItemV(validatorPowers))

agc.PrepareRound(ctx, uint64(ctx.BlockHeight()-1))
agc.PrepareRoundBeginBlock(ctx, uint64(ctx.BlockHeight()))
}

func ResetAggregatorContext() {
Expand Down
17 changes: 11 additions & 6 deletions x/oracle/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,15 @@
func (AppModule) ConsensusVersion() uint64 { return 1 }

// BeginBlock contains the logic that is automatically triggered at the beginning of each block
func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {}
func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) {
_ = keeper.GetCaches()
agc := keeper.GetAggregatorContext(ctx, am.keeper)

Check warning

Code scanning / CodeQL

Panic in BeginBock or EndBlock consensus methods Warning

path flow from Begin/EndBlock to a panic call

logger := am.keeper.Logger(ctx)

logger.Info("prepare for next oracle round of each tokenFeeder", "height", ctx.BlockHeight())
agc.PrepareRoundBeginBlock(ctx, 0)
}

// EndBlock contains the logic that is automatically triggered at the end of each block
func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate {
Expand All @@ -175,7 +183,7 @@
agc.SetValidatorPowers(validatorPowers)
// TODO: seal all alive round since validatorSet changed here
forceSeal = true
logger.Info("validator set changed, force seal all active rounds")
logger.Info("validator set changed, force seal all active rounds", "height", ctx.BlockHeight())
}

// TODO: for v1 use mode==1, just check the failed feeders
Expand All @@ -191,7 +199,6 @@
if pTR, ok := am.keeper.GetPriceTRLatest(ctx, tokenID); ok {
pTR.RoundID++
am.keeper.AppendPriceTR(ctx, tokenID, pTR)
logger.Info("add new round with previous price under fail aggregation", "tokenID", tokenID, "roundID", pTR.RoundID)
logInfo += fmt.Sprintf(", roundID:%d, price:%s", pTR.RoundID, pTR.Price)
event.AppendAttributes(
sdk.NewAttribute(types.AttributeKeyRoundID, strconv.FormatUint(pTR.RoundID, 10)),
Expand All @@ -208,13 +215,11 @@
sdk.NewAttribute(types.AttributeKeyFinalPrice, "-"),
)
}
logger.Info(logInfo)
logger.Info(logInfo, "height", ctx.BlockHeight())
ctx.EventManager().EmitEvent(event)
}
// TODO: emit events for success sealed rounds(could ignore for v1)

logger.Info("prepare for next oracle round of each tokenFeeder")
agc.PrepareRound(ctx, 0)
keeper.ResetAggregatorContextCheckTx()

// TODO: update params happened during this block for cache, for the case: agc is recached from history and cache'params is set with the latest params by recache, but parmas changed during this block as well. or force agc to be GET first before cache be GET(later approch is better)
Expand Down
Loading