Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions langchain-core/src/singletons/callbacks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import PQueueMod from "p-queue";
import { getGlobalAsyncLocalStorageInstance } from "./async_local_storage/globals.js";
import { getDefaultLangChainClientSingleton } from "./tracer.js";

let queue: typeof import("p-queue")["default"]["prototype"];

Expand Down Expand Up @@ -60,6 +61,10 @@ export async function consumeCallback<T>(
* Waits for all promises in the queue to resolve. If the queue is
* undefined, it immediately resolves a promise.
*/
export function awaitAllCallbacks(): Promise<void> {
return typeof queue !== "undefined" ? queue.onIdle() : Promise.resolve();
export async function awaitAllCallbacks(): Promise<void> {
const defaultClient = getDefaultLangChainClientSingleton();
await Promise.all([
typeof queue !== "undefined" ? queue.onIdle() : Promise.resolve(),
defaultClient.awaitPendingTraceBatches(),
]);
}
34 changes: 34 additions & 0 deletions langchain-core/src/tracers/tests/tracer.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import { test, expect, jest } from "@jest/globals";
import * as uuid from "uuid";
import { Client } from "langsmith";
import { Serialized } from "../../load/serializable.js";
import { Document } from "../../documents/document.js";
import { Run } from "../base.js";
import { HumanMessage } from "../../messages/index.js";
import { FakeTracer } from "../../utils/testing/index.js";
import { setDefaultLangChainClientSingleton } from "../../singletons/tracer.js";
import { RunnableLambda } from "../../runnables/index.js";
import { awaitAllCallbacks } from "../../singletons/callbacks.js";

const _DATE = 1620000000000;

Date.now = jest.fn(() => _DATE);

afterEach(() => {
setDefaultLangChainClientSingleton(new Client());
});

const serialized: Serialized = {
lc: 1,
type: "constructor",
Expand Down Expand Up @@ -410,3 +418,29 @@
await tracer.handleLLMEnd({ generations: [[]] }, llmRunId3);
expect(tracer.runs.length).toBe(2);
});

test("Test tracer payload snapshots for run create and update", async () => {
const client = new Client();
(client as any).multipartIngestRuns = jest.fn(async () => {

Check failure on line 424 in langchain-core/src/tracers/tests/tracer.test.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
await new Promise((resolve) => setTimeout(resolve, 500));

Check failure on line 425 in langchain-core/src/tracers/tests/tracer.test.ts

View workflow job for this annotation

GitHub Actions / Check linting

Return values from promise executor functions cannot be read
return Promise.resolve();
});
setDefaultLangChainClientSingleton(client);
const parentRunnable = RunnableLambda.from(async (input: string) => {
const childRunnable = RunnableLambda.from(async (childInput: string) => {
return `processed: ${childInput}`;
});

const result = await childRunnable.invoke(input);
return `parent: ${result}`;
});
await parentRunnable.invoke("test input");

const beforeAwaitTime = new Date();
await awaitAllCallbacks();
const afterAwaitTime = new Date();
expect(afterAwaitTime.getTime() - beforeAwaitTime.getTime()).toBeGreaterThan(
500
);
expect((client as any).multipartIngestRuns).toHaveBeenCalled();

Check failure on line 445 in langchain-core/src/tracers/tests/tracer.test.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
});
Loading