Skip to content

Commit

Permalink
Wait using Promise.all()
Browse files Browse the repository at this point in the history
  • Loading branch information
Rindrics committed Oct 18, 2024
1 parent a2f04a5 commit 7256469
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,29 @@ import type { AgentId } from "../types";
import { elementsToMarkdown } from "../utils/unstructured";
import type { Graph } from "./types";

const flushMetricsAndShutdown = async (lf: Langfuse, metricReader: any) => {
return new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error("Metric flush timeout after 20 seconds"));
}, 20000);

Promise.all([metricReader.forceFlush(), lf.shutdownAsync()])
.then(() => {
clearTimeout(timeoutId);
resolve();
})
.catch((error) => {
clearTimeout(timeoutId);
reject(error);
});
});
};

type GenerateArtifactStreamParams = {
userPrompt: string;
systemPrompt?: string;
};

export async function generateArtifactStream(
params: GenerateArtifactStreamParams,
) {
Expand Down Expand Up @@ -56,16 +75,24 @@ export async function generateArtifactStream(
subscriptionId,
isR06User,
});
generation.end({
output: result,
});

waitUntil(
metricReader.forceFlush().catch((error) => {
console.error("Failed to flush metrics:", error);
flushMetricsAndShutdown(lf, metricReader).catch((error) => {
if (error.message === "Metric flush timeout after 20 seconds") {
console.error(
"Metric flush and Langfuse shutdown timed out:",
error,
);
} else {
console.error(
"Error during metric flush and Langfuse shutdown:",
error,
);
}
}),
);
await lf.shutdownAsync();
generation.end({
output: result,
});
},
});

Expand Down

0 comments on commit 7256469

Please sign in to comment.