Skip to content

Commit

Permalink
test: modify recon sync tests to start searching and "newest" events …
Browse files Browse the repository at this point in the history
…rather than beginning of time

in durable envs with GBs of data, starting from 0 never works
  • Loading branch information
dav1do committed Jun 4, 2024
1 parent 53248f6 commit decfccb
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions suite/src/__tests__/fast/sync-events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ import { ReconEvent, ReconEventInput, randomEvents } from '../../utils/rustCeram
const delay = utilities.delay
// Environment variables
const CeramicUrls = String(process.env.CERAMIC_URLS).split(',')
async function registerInterest(url: string, model: StreamID) {
let response = await fetch(url + `/ceramic/interests/model/${model.toString()}`, { method: 'POST' })

async function registerInterest(url: string, model: StreamID): Promise<string> {
const response = await fetch(url + `/ceramic/interests/model/${model.toString()}`, { method: 'POST' })
if (response.status !== 204) {
const data = await response.text()
console.warn(`registerInterest: ${data}`)
}
expect(response.status).toEqual(204)
const tokenResponse = await fetch(url + `/ceramic/feed/resumeToken`, { method: 'GET' })
if (tokenResponse.status !== 200) {
const data = await tokenResponse.text()
console.warn(`resumeToken: ${data}`)
}
expect(tokenResponse.status).toEqual(200)
const token = await tokenResponse.json()
return token.resumeToken
}

async function writeEvents(url: string, events: ReconEventInput[]) {
Expand All @@ -30,18 +39,17 @@ async function writeEvents(url: string, events: ReconEventInput[]) {
}
}

async function readEvents(url: string, model: StreamID) {
async function readEvents(url: string, model: StreamID, resumeToken: String) {
const events = []
let complete = false;
let offset = 0;
console.log(`readEvents: ${url} ${model.toString()} ${resumeToken}`)
var startTime = Date.now();
while (!complete) {
const fullUrl = url + `/ceramic/experimental/events/model/${model.toString()}?offset=${offset}`
const fullUrl = url + `/ceramic/feed/events?resumeAt=${resumeToken}`
const response = await fetch(fullUrl)
expect(response.status).toEqual(200)
const data = await response.json();
events.push(...data.events)
offset = data.resumeOffset
complete = data.isComplete
if (!complete && (Date.now() - startTime) > 60000) {
// if it took more than a minute, quit
Expand All @@ -65,12 +73,15 @@ function sortModelEvents(events: ReconEvent[]): ReconEvent[] {
}

// Wait up till retries seconds for all urls to have at least count events
async function waitForEventCount(urls: string[], model: StreamID, count: number, retries: number) {
async function waitForEventCount(urls: string[], model: StreamID, count: number, retries: number, resumeTokens: string[]) {
if (urls.length !== resumeTokens.length) {
throw new Error('The lengths of urls and resumeTokens arrays must be equal');
}
for (let r = 0; r < retries; r++) {
let all_good = true;
for (let u in urls) {
let url = urls[u];
let events = await readEvents(url, model)
for (let i = 0; i < urls.length; i++) {
let url = urls[i];
let events = await readEvents(url, model, resumeTokens[i])
if (events.length < count) {
all_good = false;
break;
Expand Down Expand Up @@ -103,18 +114,21 @@ describe('sync events', () => {
await registerInterest(firstNodeUrl, modelID)
await writeEvents(firstNodeUrl, modelEvents)

const resumeTokens: string[] = []
// Now subscribe on the other nodes
for (let idx = 1; idx < CeramicUrls.length; idx++) {
let url = CeramicUrls[idx]
await registerInterest(url, modelID)
const token = await registerInterest(url, modelID)
resumeTokens[idx] = token
}
const sortedModelEvents = sortModelEvents(modelEvents)
await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10)
await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10, resumeTokens)

// Use a sorted expected value for stable tests
// Validate each node got the events, including the first node
for (const url of CeramicUrls) {
const events = await readEvents(url, modelID)
for (let i = 0; i < CeramicUrls.length; i++) {
const url = CeramicUrls[i]
const events = await readEvents(url, modelID, resumeTokens[i])

expect(events).toEqual(sortedModelEvents)
}
Expand All @@ -124,20 +138,22 @@ describe('sync events', () => {
const modelID = new StreamID('model', randomCID())
let modelEvents = randomEvents(modelID, 10);
// Subscribe on all nodes then write the data
const resumeTokens: string[] = []
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
await registerInterest(url, modelID)
const token = await registerInterest(url, modelID)
resumeTokens[idx] = token
}
await writeEvents(firstNodeUrl, modelEvents)

await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10)
await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
// Validate each node got the events, including the first node
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
let events = await readEvents(url, modelID)
let events = await readEvents(url, modelID, resumeTokens[idx])

expect(events).toEqual(sortedModelEvents)
}
Expand All @@ -152,22 +168,24 @@ describe('sync events', () => {
let secondHalf = modelEvents.slice(half, modelEvents.length)
await writeEvents(firstNodeUrl, firstHalf)

const resumeTokens: string[] = []
// Now subscribe on the other nodes
for (let idx = 1; idx < CeramicUrls.length; idx++) {
let url = CeramicUrls[idx]
await registerInterest(url, modelID)
const token = await registerInterest(url, modelID)
resumeTokens[idx] = token
}
// Write the second half of the data
await writeEvents(firstNodeUrl, secondHalf)

await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10)
await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
// Validate each node got the events, including the first node
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
let events = await readEvents(url, modelID)
let events = await readEvents(url, modelID, resumeTokens[idx])

expect(events).toEqual(sortedModelEvents)
}
Expand All @@ -179,23 +197,25 @@ describe('sync events', () => {
let firstHalf = modelEvents.slice(0, half)
let secondHalf = modelEvents.slice(half, modelEvents.length)

const resumeTokens: string[] = []
// Subscribe on all nodes then write the data
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
await registerInterest(url, modelID)
const token = await registerInterest(url, modelID)
resumeTokens[idx] = token
}

// Write to both node simultaneously
await Promise.all([writeEvents(firstNodeUrl, firstHalf), writeEvents(secondNodeUrl, secondHalf)])

await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10)
await waitForEventCount(CeramicUrls, modelID, modelEvents.length, 10, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
// Validate each node got the events, including the first node
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
let events = await readEvents(url, modelID)
let events = await readEvents(url, modelID, resumeTokens[idx])

expect(events).toEqual(sortedModelEvents)
}
Expand All @@ -216,12 +236,14 @@ describe('sync events', () => {
})
}

const resumeTokens: string[] = []
// Subscribe on all nodes to all models then write the data
for (let m in models) {
let model = models[m]
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
await registerInterest(url, model.id)
const token = await registerInterest(url, model.id)
resumeTokens[idx] = token
}
}

Expand All @@ -242,14 +264,14 @@ describe('sync events', () => {
let events = model.events.flat();


await waitForEventCount(CeramicUrls, model.id, events.length, 20)
await waitForEventCount(CeramicUrls, model.id, events.length, 20, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(events)
// Validate each node got the events, including the first node
for (let idx in CeramicUrls) {
let url = CeramicUrls[idx]
let events = await readEvents(url, model.id)
let events = await readEvents(url, model.id, resumeTokens[idx])

expect(events).toEqual(sortedModelEvents)
}
Expand Down

0 comments on commit decfccb

Please sign in to comment.