Skip to content

Commit

Permalink
fix circular import
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Jan 7, 2025
1 parent e1f2da2 commit 6f7d133
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 156 deletions.
2 changes: 1 addition & 1 deletion fetcher/getApiData.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SteamAPIUrls, getSteamAPIData, redisCount } from '../util/utility';
import { Archive } from '../store/archive';
import cassandra from '../store/cassandra';
import { type ApiMatch } from '../util/pgroup';
import type { ApiMatch } from '../util/types';
import { MatchFetcher } from './base';
import { insertMatch } from '../util/insert';

Expand Down
2 changes: 1 addition & 1 deletion global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ type GcDataJob = {
match_id: number;
};

type CountsJob = import('./util/pgroup').ApiMatch;
type CountsJob = import('./util/types').ApiMatch;
type ScenariosJob = string;
type CacheJob = string;

Expand Down
2 changes: 1 addition & 1 deletion svc/backfill.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Fetches old matches from Steam API and writes to blob storage
import config from '../config';
import { Archive } from '../store/archive';
import type { ApiMatch } from '../util/pgroup';
import type { ApiMatch } from '../util/types';
import { SteamAPIUrls, getSteamAPIData, transformMatch, getApiHosts } from '../util/utility';
import fs from 'fs';
import redis from '../store/redis';
Expand Down
3 changes: 1 addition & 2 deletions svc/fullhistory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import {
import db from '../store/db';
import { runQueue } from '../store/queue';
import { getPlayerMatches } from '../util/buildPlayer';
import { insertMatch, reconcileMatch } from '../util/insert';
import { ApiMatch } from '../util/pgroup';
import type { ApiMatch } from '../util/types';

async function updatePlayer(player: FullHistoryJob) {
// done with this player, update
Expand Down
3 changes: 2 additions & 1 deletion svc/reconcile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
* It checks our own database and updates player_caches so these matches get associated with the player.
*/
import db from "../store/db";
import { HistoryType, reconcileMatch } from "../util/insert";
import { reconcileMatch } from "../util/reconcileMatch";
import type { HistoryType } from "../util/types";

async function doReconcile() {
while (true) {
Expand Down
2 changes: 1 addition & 1 deletion svc/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import config from '../config';
import redis from '../store/redis';
import { insertMatch } from '../util/insert';
import type { ApiMatch } from '../util/pgroup';
import type { ApiMatch } from '../util/types';
import {
SteamAPIUrls,
getApiHosts,
Expand Down
3 changes: 1 addition & 2 deletions util/buildMatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import {
} from './utility';
import redis from '../store/redis';
import db from '../store/db';
import { ApiMatch } from './pgroup';
import { ApiMatch } from './types';
import { ParsedFetcher } from '../fetcher/getParsedData';
import { ApiFetcher } from '../fetcher/getApiData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { ArchivedFetcher } from '../fetcher/getArchivedData';
import { MetaFetcher } from '../fetcher/getMeta';
import { benchmarks } from './benchmarksUtil';
import { promises as fs } from 'fs';

const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();
Expand Down
142 changes: 6 additions & 136 deletions util/insert.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import moment from 'moment';
import { patch } from 'dotaconstants';
import util from 'util';
import util from 'node:util';
import { promises as fs } from 'fs';
import config from '../config';
import { addJob, addReliableJob } from '../store/queue';
import { computeMatchData } from './compute';
import db, { getPostgresColumns } from '../store/db';
import redis from '../store/redis';
import { es, INDEX } from '../store/elasticsearch';
import cassandra, { getCassandraColumns } from '../store/cassandra';
import cassandra from '../store/cassandra';
import type knex from 'knex';
import {
getAnonymousAccountId,
Expand All @@ -20,16 +19,17 @@ import {
getPatchIndex,
redisCount,
transformMatch,
createMatchCopy,
} from './utility';
import {
getMatchRankTier,
isRecentVisitor,
isRecentlyVisited,
} from './queries';
import { ApiMatch, ApiMatchPro, ApiPlayer, getPGroup } from './pgroup';
import { getPGroup } from './pgroup';
import { Archive } from '../store/archive';
import { getMatchDataFromBlobWithMetadata } from './buildMatch';
// import scylla from './scylla';
import type { ApiMatch, ApiPlayer, InsertMatchInput } from './types';
import { upsertPlayerCaches } from './playerCaches';

moment.relativeTimeThreshold('ss', 0);

Expand Down Expand Up @@ -132,136 +132,6 @@ export async function insertPlayerRating(row: PlayerRating) {
}
}

function createMatchCopy<T>(match: any): T {
// Makes a deep copy of the original match
const copy = JSON.parse(JSON.stringify(match));
return copy;
}

export async function upsertPlayerCaches(
match: InsertMatchInput | ParsedMatch | Match,
averageRank: number | undefined,
pgroup: PGroup,
type: DataType,
) {
// Add the 10 player_match rows indexed by player
// We currently do this on all types
const copy = createMatchCopy<Match>(match);
if (averageRank) {
copy.average_rank = averageRank;
}
const columns = await getCassandraColumns('player_caches');
return Promise.all(
copy.players.map(async (p) => {
// add account id to each player so we know what caches to update
const account_id = pgroup[p.player_slot]?.account_id;
// join player with match to form player_match
const playerMatch: Partial<ParsedPlayerMatch> = {
...p,
...copy,
account_id,
players: undefined,
};
if (
!playerMatch.account_id ||
playerMatch.account_id === getAnonymousAccountId()
) {
return false;
}
if (type === 'api' || type === 'reconcile') {
// We currently update this for the non-anonymous players in the match
// It'll reflect the current anonymity state of the players at insertion time
// This might lead to changes in peers counts after a fullhistory update or parse request
// When reconciling after gcdata we will update this with non-anonymized data (but we won't reconcile for players with open match history so their peers may be incomplete)
playerMatch.heroes = pgroup;
}
computeMatchData(playerMatch as ParsedPlayerMatch);
// Remove extra properties
Object.keys(playerMatch).forEach((key) => {
if (!columns[key]) {
delete playerMatch[key as keyof ParsedPlayerMatch];
}
});
const serializedMatch: any = serialize(playerMatch);
if (
(config.NODE_ENV === 'development' || config.NODE_ENV === 'test') &&
(playerMatch.player_slot === 0 || type === 'reconcile')
) {
await fs.writeFile(
'./json/' +
copy.match_id +
`_playercache_${type}_${playerMatch.player_slot}.json`,
JSON.stringify(serializedMatch, null, 2),
);
}
if (type === 'reconcile') {
console.log(playerMatch.account_id, copy.match_id, playerMatch.player_slot);
redisCount('reconcile');
}
const query = util.format(
'INSERT INTO player_caches (%s) VALUES (%s)',
Object.keys(serializedMatch).join(','),
Object.keys(serializedMatch)
.map(() => '?')
.join(','),
);
const arr = Object.keys(serializedMatch).map((k) => serializedMatch[k]);
await cassandra.execute(query, arr, {
prepare: true,
});
// TODO (scylla) dual write here
// TODO (scylla) need to write a migrater with checkpointing (one player at a time and then do all players?) copy all data from cassandra to scylla
// Don't need to dual read if we don't delete the original data until fully migrated
// New tokens might be inserted behind the migrater or double migrate some rows but since we are dual writing we should have the same data in both
// await scylla.execute(query, arr, {
// prepare: true
// });
return true;
}),
);
}

export type HistoryType = {account_id: number, match_id: number, player_slot: number};

export async function reconcileMatch(rows: HistoryType[]) {
// validate that all rows have the same match ID
const set = new Set(rows.map(r => r.match_id));
if (set.size > 1) {
throw new Error('multiple match IDs found in input to reconcileMatch');
}
// optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update)
const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id);
if (!match) {
// Note: unless we backfill, we have limited API data for old matches
// For more recent matches we're more likely to have data
// Maybe we can mark the more recent matches with a flag
// Or queue up recent matches from fullhistory and process them in order so fh requests show updates quicker
return;
}
const pgroup = getPGroup(match);
// If reconciling after fullhistory, the pgroup won't contain account_id info. Add it.
rows.forEach(r => {
if (!pgroup[r.player_slot]?.account_id) {
pgroup[r.player_slot].account_id = r.account_id;
}
});
const targetSlots = new Set(rows.map(r => r.player_slot));
// Filter to only players that we want to fill in
match.players = match.players.filter(p => targetSlots.has(p.player_slot));
if (!match.players.length) {
return;
}
// Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search)
const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile');
if (result.every(Boolean)) {
// Delete the rows since we successfully updated
await Promise.all(rows.map(async (row) => {
return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]);
}));
}
}

export type InsertMatchInput = ApiMatch | ApiMatchPro | ParserMatch | GcMatch;

/**
* Inserts a piece of match data into storage
Expand Down
9 changes: 1 addition & 8 deletions util/pgroup.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
import apiMatch from '../test/data/details_api.json';
import apiMatchPro from '../test/data/details_api_pro.json';

export type ApiMatch = (typeof apiMatch)['result'];
export type ApiMatchPro = (typeof apiMatchPro)['result'];
export type ApiPlayer = ApiMatch['players'][number] & {
ability_upgrades_arr?: number[];
};
import type { ApiMatch } from "./types";

export function getPGroup(match: ApiMatch | Match | ParsedMatch): PGroup {
// This only works if we are an API insert or reconciling (match/parsedmatch)
Expand Down
90 changes: 90 additions & 0 deletions util/playerCaches.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import config from "../config";
import cassandra, { getCassandraColumns } from "../store/cassandra";
import { computeMatchData } from "./compute";
import type { InsertMatchInput } from "./types";
import { createMatchCopy, getAnonymousAccountId, redisCount, serialize } from "./utility";
import { promises as fs } from 'node:fs';
import util from 'node:util';

export async function upsertPlayerCaches(
match: InsertMatchInput | ParsedMatch | Match,
averageRank: number | undefined,
pgroup: PGroup,
type: DataType,
) {
// Add the 10 player_match rows indexed by player
// We currently do this on all types
const copy = createMatchCopy<Match>(match);
if (averageRank) {
copy.average_rank = averageRank;
}
const columns = await getCassandraColumns('player_caches');
return Promise.all(
copy.players.map(async (p) => {
// add account id to each player so we know what caches to update
const account_id = pgroup[p.player_slot]?.account_id;
// join player with match to form player_match
const playerMatch: Partial<ParsedPlayerMatch> = {
...p,
...copy,
account_id,
players: undefined,
};
if (
!playerMatch.account_id ||
playerMatch.account_id === getAnonymousAccountId()
) {
return false;
}
if (type === 'api' || type === 'reconcile') {
// We currently update this for the non-anonymous players in the match
// It'll reflect the current anonymity state of the players at insertion time
// This might lead to changes in peers counts after a fullhistory update or parse request
// When reconciling after gcdata we will update this with non-anonymized data (but we won't reconcile for players with open match history so their peers may be incomplete)
playerMatch.heroes = pgroup;
}
computeMatchData(playerMatch as ParsedPlayerMatch);
// Remove extra properties
Object.keys(playerMatch).forEach((key) => {
if (!columns[key]) {
delete playerMatch[key as keyof ParsedPlayerMatch];
}
});
const serializedMatch: any = serialize(playerMatch);
if (
(config.NODE_ENV === 'development' || config.NODE_ENV === 'test') &&
(playerMatch.player_slot === 0 || type === 'reconcile')
) {
await fs.writeFile(
'./json/' +
copy.match_id +
`_playercache_${type}_${playerMatch.player_slot}.json`,
JSON.stringify(serializedMatch, null, 2),
);
}
if (type === 'reconcile') {
console.log(playerMatch.account_id, copy.match_id, playerMatch.player_slot);
redisCount('reconcile');
}
const query = util.format(
'INSERT INTO player_caches (%s) VALUES (%s)',
Object.keys(serializedMatch).join(','),
Object.keys(serializedMatch)
.map(() => '?')
.join(','),
);
const arr = Object.keys(serializedMatch).map((k) => serializedMatch[k]);
await cassandra.execute(query, arr, {
prepare: true,
});
// TODO (scylla) dual write here
// TODO (scylla) need to write a migrater with checkpointing (one player at a time and then do all players?) copy all data from cassandra to scylla
// Don't need to dual read if we don't delete the original data until fully migrated
// New tokens might be inserted behind the migrater or double migrate some rows but since we are dual writing we should have the same data in both
// await scylla.execute(query, arr, {
// prepare: true
// });
return true;
}),
);
}
43 changes: 43 additions & 0 deletions util/reconcileMatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { HistoryType } from "./types";
import db from '../store/db';
import { getPGroup } from "./pgroup";
import { upsertPlayerCaches } from "./playerCaches";
import { getMatchDataFromBlobWithMetadata } from "./buildMatch";

export async function reconcileMatch(rows: HistoryType[]) {
// validate that all rows have the same match ID
const set = new Set(rows.map(r => r.match_id));
if (set.size > 1) {
throw new Error('multiple match IDs found in input to reconcileMatch');
}
// optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update)
const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id);
if (!match) {
// Note: unless we backfill, we have limited API data for old matches
// For more recent matches we're more likely to have data
// Maybe we can mark the more recent matches with a flag
// Or queue up recent matches from fullhistory and process them in order so fh requests show updates quicker
return;
}
const pgroup = getPGroup(match);
// If reconciling after fullhistory, the pgroup won't contain account_id info. Add it.
rows.forEach(r => {
if (!pgroup[r.player_slot]?.account_id) {
pgroup[r.player_slot].account_id = r.account_id;
}
});
const targetSlots = new Set(rows.map(r => r.player_slot));
// Filter to only players that we want to fill in
match.players = match.players.filter(p => targetSlots.has(p.player_slot));
if (!match.players.length) {
return;
}
// Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search)
const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile');
if (result.every(Boolean)) {
// Delete the rows since we successfully updated
await Promise.all(rows.map(async (row) => {
return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]);
}));
}
}
Loading

0 comments on commit 6f7d133

Please sign in to comment.