Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate new dlob orderbooks #493

Merged
merged 12 commits into from
Oct 27, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT 'Deleting from token_historical_daily dates after 8/03/2023' AS comment;

DELETE FROM token_historical_daily WHERE historical_timestamp > '2023-08-03 00:00:00';
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class ValidatorMarketRateStatsRecord(id: EntityID<Int>) : IntEntity(id) {
}

fun findByAddress(address: String, fromDate: DateTime?, toDate: DateTime?, count: Int) = transaction {
val query = ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
val query =
ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
if (fromDate != null) {
query.andWhere { ValidatorMarketRateStatsTable.date greaterEq fromDate }
}
Expand Down Expand Up @@ -272,6 +273,14 @@ class TokenHistoricalDailyRecord(id: EntityID<DateTime>) : Entity<DateTime>(id)
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.firstOrNull()?.data?.quote?.get(USD_UPPER)?.close ?: BigDecimal.ZERO
}

fun getLatestDateEntry(): TokenHistoricalDailyRecord? = transaction {
return@transaction TokenHistoricalDailyRecord
.all()
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.limit(1)
.firstOrNull()
}
}

var timestamp by TokenHistoricalDailyTable.timestamp
Expand Down Expand Up @@ -305,7 +314,7 @@ class ProcessQueueRecord(id: EntityID<Int>) : IntEntity(id) {
fun delete(processType: ProcessQueueType, value: String) = transaction {
ProcessQueueTable.deleteWhere {
(ProcessQueueTable.processType eq processType.name) and
(ProcessQueueTable.processValue eq value)
(processValue eq value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}

fun getTokenBreakdown() = runBlocking {
val bonded = accountClient.getStakingPool().pool.bondedTokens.toBigDecimal().roundWhole().toCoinStr(UTILITY_TOKEN)
val bonded =
accountClient.getStakingPool().pool.bondedTokens.toBigDecimal().roundWhole().toCoinStr(UTILITY_TOKEN)
TokenSupply(
maxSupply().toCoinStr(UTILITY_TOKEN),
totalSupply().toCoinStr(UTILITY_TOKEN),
Expand All @@ -157,16 +158,19 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}

fun nhashMarkerAddr() = MarkerCacheRecord.findByDenom(UTILITY_TOKEN)?.markerAddress!!
fun burnedSupply() = runBlocking { accountClient.getMarkerBalance(nhashMarkerAddr(), UTILITY_TOKEN).toBigDecimal().roundWhole() }
fun burnedSupply() =
runBlocking { accountClient.getMarkerBalance(nhashMarkerAddr(), UTILITY_TOKEN).toBigDecimal().roundWhole() }

fun moduleAccounts() = AccountRecord.findAccountsByType(listOf(Auth.ModuleAccount::class.java.simpleName))
fun zeroSeqAccounts() = AccountRecord.findZeroSequenceAccounts()
fun vestingAccounts() = AccountRecord.findAccountsByType(vestingAccountTypes)
fun contractAccounts() = AccountRecord.findContractAccounts()
fun allAccounts() = transaction { AccountRecord.all().toMutableList() }
fun communityPoolSupply() = runBlocking { accountClient.getCommunityPoolAmount(UTILITY_TOKEN).toBigDecimal().roundWhole() }
fun communityPoolSupply() =
runBlocking { accountClient.getCommunityPoolAmount(UTILITY_TOKEN).toBigDecimal().roundWhole() }

fun richListAccounts() =
allAccounts().addressList() - zeroSeqAccounts().toSet() - moduleAccounts().addressList() -
contractAccounts().addressList() - setOf(nhashMarkerAddr())
allAccounts().addressList() - zeroSeqAccounts().toSet() - moduleAccounts().addressList() - contractAccounts().addressList() - setOf(nhashMarkerAddr())

fun totalBalanceForList(addresses: Set<String>) = runBlocking {
TokenDistributionPaginatedResultsRecord.findByAddresses(addresses).asFlow()
Expand Down Expand Up @@ -217,10 +221,10 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase? = runBlocking {
fun getHistoricalFromDlob(startTime: DateTime, tickerId: String): DlobHistBase? = runBlocking {
try {
KTOR_CLIENT_JAVA.get("https://www.dlob.io:443/gecko/external/api/v1/exchange/historical_trades") {
parameter("ticker_id", "HASH_USD")
parameter("ticker_id", tickerId)
parameter("type", "buy")
parameter("start_time", DateTimeFormat.forPattern("dd-MM-yyyy").print(startTime))
accept(ContentType.Application.Json)
Expand All @@ -234,6 +238,15 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase? {
val tickerIds = listOf("HASH_USD", "HASH_USDOMNI")

val dlobHistorical = tickerIds
.flatMap { getHistoricalFromDlob(startTime, it)?.buy.orEmpty() }

return if (dlobHistorical.isNotEmpty()) DlobHistBase(dlobHistorical) else null
}

fun getTokenHistorical(fromDate: DateTime?, toDate: DateTime?) =
TokenHistoricalDailyRecord.findForDates(fromDate?.startOfDay(), toDate?.startOfDay())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ import tendermint.types.BlockOuterClass
import java.math.BigDecimal
import java.time.OffsetDateTime
import java.time.ZoneOffset
import javax.annotation.PostConstruct

@Service
class AsyncService(
Expand All @@ -104,6 +105,18 @@ class AsyncService(
protected val logger = logger(AsyncService::class)
protected var collectHistorical = true

@PostConstruct
fun asyncServiceOnStartInit() {
Thread {
try {
Thread.sleep(5000)
updateTokenHistorical()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}.start()
}

@Scheduled(initialDelay = 0L, fixedDelay = 5000L)
fun updateLatestBlockHeightJob() {
val index = getBlockIndex()
Expand Down Expand Up @@ -145,7 +158,7 @@ class AsyncService(
}

fun getBlockIndex() = blockService.getBlockIndexFromCache()?.let {
Pair<Int?, Int?>(it.maxHeightRead, it.minHeightRead)
Pair(it.maxHeightRead, it.minHeightRead)
}

fun startCollectingHistoricalBlocks(blockIndex: Pair<Int?, Int?>?) =
Expand Down Expand Up @@ -248,11 +261,11 @@ class AsyncService(
val now = OffsetDateTime.now().withOffsetSameInstant(ZoneOffset.UTC).toString()
cacheService.getCacheValue(key)!!.let { cache ->
pricingService.getPricingAsync(cache.cacheValue!!, "async pricing update").forEach { price ->
// dont set price from PE
// don't set price from PE
if (price.markerDenom != UTILITY_TOKEN) {
assetService.getAssetRaw(price.markerDenom).let { pricingService.insertAssetPricing(it, price) }
} else {
// Pull price from CMC, calced to the true base denom price
// Pull price from CMC, calculate to the true base denom price
val cmcPrice =
tokenService.getTokenLatest()?.quote?.get(USD_UPPER)?.price
?.let {
Expand Down Expand Up @@ -280,8 +293,14 @@ class AsyncService(
@Scheduled(cron = "0 0 1 * * ?") // Every day at 1 am
fun updateTokenHistorical() {
val today = DateTime.now().startOfDay()
val startDate = today.minusMonths(1)
var startDate = today.minusMonths(1)
val latest = TokenHistoricalDailyRecord.getLatestDateEntry()
if (latest != null) {
startDate = latest.timestamp.minusDays(1)
}
val dlobRes = tokenService.getHistoricalFromDlob(startDate) ?: return
logger.info("Updating token historical data starting from $startDate with ${dlobRes.buy.size} buy records for roll-up.")

val baseMap = Interval(startDate, today)
.let { int -> generateSequence(int.start) { dt -> dt.plusDays(1) }.takeWhile { dt -> dt < int.end } }
.map { it to emptyList<DlobHistorical>() }.toMap().toMutableMap()
Expand Down Expand Up @@ -312,7 +331,9 @@ class AsyncService(
low = low?.price ?: prevPrice,
close = close,
volume = usdVolume,
market_cap = close.multiply(tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)),
market_cap = close.multiply(
tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)
),
timestamp = closeDate
)
)
Expand Down Expand Up @@ -346,7 +367,10 @@ class AsyncService(
today,
mapOf(USD_UPPER to CmcLatestQuoteAbbrev(price, percentChg, vol24Hr, marketCap, today))
)
CacheUpdateRecord.updateCacheByKey(CacheKeys.UTILITY_TOKEN_LATEST.key, VANILLA_MAPPER.writeValueAsString(rec))
CacheUpdateRecord.updateCacheByKey(
CacheKeys.UTILITY_TOKEN_LATEST.key,
VANILLA_MAPPER.writeValueAsString(rec)
)
}
}

Expand Down Expand Up @@ -443,7 +467,8 @@ class AsyncService(
try {
transaction { it.apply { this.processing = true } }
send(it.processValue)
} catch (_: Exception) { }
} catch (_: Exception) {
}
}
}
}
Expand Down