Skip to content

Commit

Permalink
Await forceFlush() metrics with Promise.all()
Browse files Browse the repository at this point in the history
  • Loading branch information
Rindrics committed Oct 18, 2024
1 parent f82cefb commit ee4b9c6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 148 deletions.
145 changes: 6 additions & 139 deletions app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,29 @@
"use server";

import { metricReader } from "@/instrumentation";
import { openai } from "@ai-sdk/openai";
import {
type CoreMessage,
jsonSchema,
streamText as sdkStreamText,
streamObject,
} from "ai";
import { createStreamableValue } from "ai/rsc";
import { UnstructuredClient } from "unstructured-client";

import { getUserSubscriptionId, isRoute06User } from "@/app/(auth)/lib";
import { agents, db } from "@/drizzle";
import { metrics } from "@opentelemetry/api";
import { createId } from "@paralleldrive/cuid2";
import { put } from "@vercel/blob";
import { eq } from "drizzle-orm";
import { Langfuse } from "langfuse";
import { Strategy } from "unstructured-client/sdk/models/shared";
import { schema as artifactSchema } from "../artifact/schema";
import type { FileId } from "../files/types";
import type { GiselleNodeArchetype } from "../giselle-node/blueprints";
import { schema } from "../artifact/schema";
import type { AgentId } from "../types";
import { elementsToMarkdown } from "../utils/unstructured";
import type { Graph } from "./types";

const webSearchSchema = jsonSchema<{ keywords: string[] }>({
$schema: "https://json-schema.org/draft/2020-12/schema",
title: "keyword schema",
type: "object",
properties: {
keywords: {
type: "array",
items: {
type: "string",
},
},
},
required: ["keywords"],
});

type GenerateArtifactStreamParams = {
type GenerateObjectStreamParams = {
userPrompt: string;
systemPrompt?: string;
};
export async function generateArtifactStream(
params: GenerateArtifactStreamParams,
) {
export async function generateObjectStream(params: GenerateObjectStreamParams) {
const lf = new Langfuse();
const trace = lf.trace({
id: `giselle-${Date.now()}`,
Expand All @@ -63,7 +40,7 @@ export async function generateArtifactStream(
model: openai(model),
system: params.systemPrompt ?? "You generate an answer to a question. ",
prompt: params.userPrompt,
schema: artifactSchema,
schema,
onFinish: async (result) => {
const meter = metrics.getMeter("OpenAI");
const tokenCounter = meter.createCounter("token_consumed", {
Expand All @@ -78,58 +55,9 @@ export async function generateArtifactStream(
generation.end({
output: result,
});
await lf.shutdownAsync();
},
});

for await (const partialObject of partialObjectStream) {
stream.update(partialObject);
}

stream.done();
})();

return { object: stream.value };
}

type GenerateWebSearchStreamInputs = {
userPrompt: string;
systemPrompt?: string;
};
export async function generateWebSearchStream(
inputs: GenerateWebSearchStreamInputs,
) {
const lf = new Langfuse();
const trace = lf.trace({
id: `giselle-${Date.now()}`,
});
const stream = createStreamableValue();
Promise.all([metricReader.forceFlush()]);

(async () => {
const model = "gpt-4o-mini";
const generation = trace.generation({
input: inputs.userPrompt,
model,
});
const { partialObjectStream } = await streamObject({
model: openai(model),
system: inputs.systemPrompt ?? "You generate an answer to a question. ",
prompt: inputs.userPrompt,
schema: artifactSchema,
onFinish: async (result) => {
const meter = metrics.getMeter("OpenAI");
const tokenCounter = meter.createCounter("token_consumed", {
description: "Number of OpenAI API tokens consumed by each request",
});
const subscriptionId = await getUserSubscriptionId();
const isR06User = await isRoute06User();
tokenCounter.add(result.usage.totalTokens, {
subscriptionId,
isR06User,
});
generation.end({
output: result,
});
await lf.shutdownAsync();
},
});
Expand All @@ -150,64 +78,3 @@ export async function setGraphToDb(agentId: AgentId, graph: Graph) {
.set({ graphv2: graph, graphHash: createId() })
.where(eq(agents.id, agentId));
}

type UploadFileArgs = {
fileId: FileId;
file: File;
};
export async function uploadFile(args: UploadFileArgs) {
const blob = await put(`files/${args.fileId}/${args.file.name}`, args.file, {
access: "public",
contentType: args.file.type,
});
return blob;
}

type ParseFileArgs = {
id: FileId;
name: string;
blobUrl: string;
};
export async function parseFile(args: ParseFileArgs) {
if (process.env.UNSTRUCTURED_API_KEY === undefined) {
throw new Error("UNSTRUCTURED_API_KEY is not set");
}
const client = new UnstructuredClient({
security: {
apiKeyAuth: process.env.UNSTRUCTURED_API_KEY,
},
});
const response = await fetch(args.blobUrl);
const content = await response.blob();
const partitionReponse = await client.general.partition({
partitionParameters: {
files: {
fileName: args.name,
content,
},
strategy: Strategy.Fast,
splitPdfPage: false,
splitPdfConcurrencyLevel: 1,
},
});
if (partitionReponse.statusCode !== 200) {
console.error(partitionReponse.rawResponse);
throw new Error(`Failed to parse file: ${partitionReponse.statusCode}`);
}
const jsonString = JSON.stringify(partitionReponse.elements, null, 2);
const blob = new Blob([jsonString], { type: "application/json" });

await put(`files/${args.id}/partition.json`, blob, {
access: "public",
contentType: blob.type,
});

const markdown = elementsToMarkdown(partitionReponse.elements ?? []);
const markdownBlob = new Blob([markdown], { type: "text/markdown" });
const vercelBlob = await put(`files/${args.id}/markdown.md`, markdownBlob, {
access: "public",
contentType: markdownBlob.type,
});

return vercelBlob;
}
19 changes: 10 additions & 9 deletions instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics";
import * as Sentry from "@sentry/nextjs";
import { registerOTel } from "@vercel/otel";

export const metricReader = new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: "https://ingest.us.signoz.cloud:443/v1/metrics",
headers: {
"signoz-access-token": process.env.SIGNOZ_INGESTION_TOKEN,
},
}),
});

export async function register() {
registerOTel({
serviceName: "giselle",
attributes: {
environment: process.env.VERCEL_ENV || "not-set",
},
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: "https://ingest.us.signoz.cloud:443/v1/metrics",
headers: {
"signoz-access-token": process.env.SIGNOZ_INGESTION_TOKEN,
},
}),
exportIntervalMillis: 10000,
}),
metricReader: metricReader,
});

if (process.env.NEXT_RUNTIME === "nodejs") {
Expand Down

0 comments on commit ee4b9c6

Please sign in to comment.