diff --git a/sdks/node/src/__tests__/weaveClient.test.ts b/sdks/node/src/__tests__/weaveClient.test.ts index 43dc69b1965c..531c16547b32 100644 --- a/sdks/node/src/__tests__/weaveClient.test.ts +++ b/sdks/node/src/__tests__/weaveClient.test.ts @@ -108,7 +108,7 @@ describe('WeaveClient', () => { beforeEach(() => { mockTraceServerApi = { call: { - callStartBatchCallUpsertBatchPost: jest.fn(), + callStartBatchCallUpsertBatchPost: jest.fn().mockResolvedValue({}), }, } as any; mockWandbServerApi = {} as any; @@ -121,6 +121,26 @@ describe('WeaveClient', () => { (client as any).BATCH_INTERVAL = 10; }); + it('should handle oversized batch items', async () => { + const bigPayloadSize = 11 * 1024 * 1024; + const smallData = {mode: 'start', data: {id: '2', payload: 'small'}}; + const bigData = { + mode: 'start', + data: {id: '1', payload: 'x'.repeat(bigPayloadSize)}, + }; + (client as any).callQueue.push(smallData, bigData); + + await (client as any).processBatch(); + + expect( + mockTraceServerApi.call.callStartBatchCallUpsertBatchPost + ).toHaveBeenCalledWith({ + batch: [{mode: 'start', req: smallData.data}], + }); + + expect((client as any).callQueue).toContain(bigData); + }); + it('should batch multiple calls together', async () => { // Add test calls to queue (client as any).callQueue.push( diff --git a/sdks/node/src/weaveClient.ts b/sdks/node/src/weaveClient.ts index f96c263ec079..7604495a6be1 100644 --- a/sdks/node/src/weaveClient.ts +++ b/sdks/node/src/weaveClient.ts @@ -1,4 +1,5 @@ import {AsyncLocalStorage} from 'async_hooks'; +import * as fs from 'fs'; import {uuidv7} from 'uuidv7'; import {Dataset} from './dataset'; @@ -32,6 +33,8 @@ import {packageVersion} from './utils/userAgent'; import {WandbServerApi} from './wandb/wandbServerApi'; import {ObjectRef, WeaveObject, getClassChain} from './weaveObject'; +const WEAVE_ERRORS_LOG_FNAME = 'weaveErrors.log'; + export type CallStackEntry = { callId: string; traceId: string; @@ -71,6 +74,9 @@ class CallStack { type CallStartParams = StartedCallSchemaForInsert; type CallEndParams = EndedCallSchemaForInsert; +// We count characters item by item, and try to limit batches to about this size. +const MAX_BATCH_SIZE_CHARS = 10 * 1024 * 1024; + export class WeaveClient { private stackContext = new AsyncLocalStorage(); private callQueue: Array<{mode: 'start' | 'end'; data: any}> = []; @@ -78,6 +84,8 @@ export class WeaveClient { private isBatchProcessing: boolean = false; private batchProcessingPromises: Set> = new Set(); private readonly BATCH_INTERVAL: number = 200; + private errorCount = 0; + private readonly MAX_ERRORS = 10; constructor( public traceServerApi: TraceServerApi, @@ -114,25 +122,43 @@ export class WeaveClient { this.isBatchProcessing = true; - // We count characters item by item, and try to limit batches to about - // this size. - const maxBatchSizeChars = 5 * 1024 * 1024; - let batchToProcess = []; let currentBatchSize = 0; - while (this.callQueue.length > 0 && currentBatchSize < maxBatchSizeChars) { - const item = this.callQueue[0]; + while ( + this.callQueue.length > 0 && + currentBatchSize < MAX_BATCH_SIZE_CHARS + ) { + const item = this.callQueue.shift(); + if (item === undefined) { + throw new Error('Call queue is empty'); + } + const itemSize = JSON.stringify(item).length; + if (itemSize > MAX_BATCH_SIZE_CHARS) { + fs.appendFileSync( + WEAVE_ERRORS_LOG_FNAME, + `Item size ${itemSize} exceeds max batch size ${MAX_BATCH_SIZE_CHARS}. Item: ${JSON.stringify(item)}\n` + ); + } - if (currentBatchSize + itemSize <= maxBatchSizeChars) { - batchToProcess.push(this.callQueue.shift()!); + if (currentBatchSize + itemSize <= MAX_BATCH_SIZE_CHARS) { + batchToProcess.push(item); currentBatchSize += itemSize; } else { + // doesn't fit, put it back + this.callQueue.unshift(item); break; } } + if (batchToProcess.length === 0) { + this.batchProcessTimeout = null; + return; + } + + this.isBatchProcessing = true; + const batchReq = { batch: batchToProcess.map(item => ({ mode: item.mode, @@ -146,8 +172,20 @@ export class WeaveClient { ); } catch (error) { console.error('Error processing batch:', error); + this.errorCount++; + fs.appendFileSync( + WEAVE_ERRORS_LOG_FNAME, + `Error processing batch: ${error}\n` + ); + // Put failed items back at the front of the queue this.callQueue.unshift(...batchToProcess); + + // Exit if we have too many errors + if (this.errorCount > this.MAX_ERRORS) { + console.error(`Exceeded max errors: ${this.MAX_ERRORS}; exiting`); + process.exit(1); + } } finally { this.isBatchProcessing = false; this.batchProcessTimeout = null; @@ -734,7 +772,9 @@ function mergeSummaries(left: Summary, right: Summary): Summary { if (typeof leftValue === 'number' && typeof result[key] === 'number') { result[key] = leftValue + result[key]; } else if ( + leftValue != null && typeof leftValue === 'object' && + result[key] != null && typeof result[key] === 'object' ) { result[key] = mergeSummaries(leftValue, result[key]);