From ee4b9c69fb535890e57559ddf28bd377d10930f9 Mon Sep 17 00:00:00 2001 From: Akira Hayashi Date: Fri, 18 Oct 2024 18:16:37 +0900 Subject: [PATCH] Await `forceFlush()` metrics with `Promise.all()` --- .../beta-proto/graph/server-actions.ts | 145 +----------------- instrumentation.ts | 19 +-- 2 files changed, 16 insertions(+), 148 deletions(-) diff --git a/app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts b/app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts index 9010c1c2f..f6af64e6f 100644 --- a/app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts +++ b/app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts @@ -1,52 +1,29 @@ "use server"; +import { metricReader } from "@/instrumentation"; import { openai } from "@ai-sdk/openai"; import { type CoreMessage, - jsonSchema, streamText as sdkStreamText, streamObject, } from "ai"; import { createStreamableValue } from "ai/rsc"; -import { UnstructuredClient } from "unstructured-client"; import { getUserSubscriptionId, isRoute06User } from "@/app/(auth)/lib"; import { agents, db } from "@/drizzle"; import { metrics } from "@opentelemetry/api"; import { createId } from "@paralleldrive/cuid2"; -import { put } from "@vercel/blob"; import { eq } from "drizzle-orm"; import { Langfuse } from "langfuse"; -import { Strategy } from "unstructured-client/sdk/models/shared"; -import { schema as artifactSchema } from "../artifact/schema"; -import type { FileId } from "../files/types"; -import type { GiselleNodeArchetype } from "../giselle-node/blueprints"; +import { schema } from "../artifact/schema"; import type { AgentId } from "../types"; -import { elementsToMarkdown } from "../utils/unstructured"; import type { Graph } from "./types"; -const webSearchSchema = jsonSchema<{ keywords: string[] }>({ - $schema: "https://json-schema.org/draft/2020-12/schema", - title: "keyword schema", - type: "object", - properties: { - keywords: { - type: "array", - items: { - type: "string", - }, - }, - }, - required: ["keywords"], -}); - -type GenerateArtifactStreamParams = { +type GenerateObjectStreamParams = { userPrompt: string; systemPrompt?: string; }; -export async function generateArtifactStream( - params: GenerateArtifactStreamParams, -) { +export async function generateObjectStream(params: GenerateObjectStreamParams) { const lf = new Langfuse(); const trace = lf.trace({ id: `giselle-${Date.now()}`, @@ -63,7 +40,7 @@ export async function generateArtifactStream( model: openai(model), system: params.systemPrompt ?? "You generate an answer to a question. ", prompt: params.userPrompt, - schema: artifactSchema, + schema, onFinish: async (result) => { const meter = metrics.getMeter("OpenAI"); const tokenCounter = meter.createCounter("token_consumed", { @@ -78,58 +55,9 @@ export async function generateArtifactStream( generation.end({ output: result, }); - await lf.shutdownAsync(); - }, - }); - - for await (const partialObject of partialObjectStream) { - stream.update(partialObject); - } - - stream.done(); - })(); - - return { object: stream.value }; -} -type GenerateWebSearchStreamInputs = { - userPrompt: string; - systemPrompt?: string; -}; -export async function generateWebSearchStream( - inputs: GenerateWebSearchStreamInputs, -) { - const lf = new Langfuse(); - const trace = lf.trace({ - id: `giselle-${Date.now()}`, - }); - const stream = createStreamableValue(); + Promise.all([metricReader.forceFlush()]); - (async () => { - const model = "gpt-4o-mini"; - const generation = trace.generation({ - input: inputs.userPrompt, - model, - }); - const { partialObjectStream } = await streamObject({ - model: openai(model), - system: inputs.systemPrompt ?? "You generate an answer to a question. ", - prompt: inputs.userPrompt, - schema: artifactSchema, - onFinish: async (result) => { - const meter = metrics.getMeter("OpenAI"); - const tokenCounter = meter.createCounter("token_consumed", { - description: "Number of OpenAI API tokens consumed by each request", - }); - const subscriptionId = await getUserSubscriptionId(); - const isR06User = await isRoute06User(); - tokenCounter.add(result.usage.totalTokens, { - subscriptionId, - isR06User, - }); - generation.end({ - output: result, - }); await lf.shutdownAsync(); }, }); @@ -150,64 +78,3 @@ export async function setGraphToDb(agentId: AgentId, graph: Graph) { .set({ graphv2: graph, graphHash: createId() }) .where(eq(agents.id, agentId)); } - -type UploadFileArgs = { - fileId: FileId; - file: File; -}; -export async function uploadFile(args: UploadFileArgs) { - const blob = await put(`files/${args.fileId}/${args.file.name}`, args.file, { - access: "public", - contentType: args.file.type, - }); - return blob; -} - -type ParseFileArgs = { - id: FileId; - name: string; - blobUrl: string; -}; -export async function parseFile(args: ParseFileArgs) { - if (process.env.UNSTRUCTURED_API_KEY === undefined) { - throw new Error("UNSTRUCTURED_API_KEY is not set"); - } - const client = new UnstructuredClient({ - security: { - apiKeyAuth: process.env.UNSTRUCTURED_API_KEY, - }, - }); - const response = await fetch(args.blobUrl); - const content = await response.blob(); - const partitionReponse = await client.general.partition({ - partitionParameters: { - files: { - fileName: args.name, - content, - }, - strategy: Strategy.Fast, - splitPdfPage: false, - splitPdfConcurrencyLevel: 1, - }, - }); - if (partitionReponse.statusCode !== 200) { - console.error(partitionReponse.rawResponse); - throw new Error(`Failed to parse file: ${partitionReponse.statusCode}`); - } - const jsonString = JSON.stringify(partitionReponse.elements, null, 2); - const blob = new Blob([jsonString], { type: "application/json" }); - - await put(`files/${args.id}/partition.json`, blob, { - access: "public", - contentType: blob.type, - }); - - const markdown = elementsToMarkdown(partitionReponse.elements ?? []); - const markdownBlob = new Blob([markdown], { type: "text/markdown" }); - const vercelBlob = await put(`files/${args.id}/markdown.md`, markdownBlob, { - access: "public", - contentType: markdownBlob.type, - }); - - return vercelBlob; -} diff --git a/instrumentation.ts b/instrumentation.ts index 25fc96101..0cc1eadbf 100644 --- a/instrumentation.ts +++ b/instrumentation.ts @@ -3,21 +3,22 @@ import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; import * as Sentry from "@sentry/nextjs"; import { registerOTel } from "@vercel/otel"; +export const metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: "https://ingest.us.signoz.cloud:443/v1/metrics", + headers: { + "signoz-access-token": process.env.SIGNOZ_INGESTION_TOKEN, + }, + }), +}); + export async function register() { registerOTel({ serviceName: "giselle", attributes: { environment: process.env.VERCEL_ENV || "not-set", }, - metricReader: new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporter({ - url: "https://ingest.us.signoz.cloud:443/v1/metrics", - headers: { - "signoz-access-token": process.env.SIGNOZ_INGESTION_TOKEN, - }, - }), - exportIntervalMillis: 10000, - }), + metricReader: metricReader, }); if (process.env.NEXT_RUNTIME === "nodejs") {