From e4a91d97dd5a57218be3229b06c8158ce8f677c0 Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Thu, 19 Dec 2024 19:14:06 -0500 Subject: [PATCH 1/6] test --- sdks/node/src/weaveClient.ts | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/sdks/node/src/weaveClient.ts b/sdks/node/src/weaveClient.ts index f96c263ec079..7cb1f5cba09b 100644 --- a/sdks/node/src/weaveClient.ts +++ b/sdks/node/src/weaveClient.ts @@ -1,4 +1,5 @@ import {AsyncLocalStorage} from 'async_hooks'; +import * as fs from 'fs'; import {uuidv7} from 'uuidv7'; import {Dataset} from './dataset'; @@ -78,6 +79,8 @@ export class WeaveClient { private isBatchProcessing: boolean = false; private batchProcessingPromises: Set> = new Set(); private readonly BATCH_INTERVAL: number = 200; + private errorCount = 0; + private readonly MAX_ERRORS = 10; constructor( public traceServerApi: TraceServerApi, @@ -116,23 +119,35 @@ export class WeaveClient { // We count characters item by item, and try to limit batches to about // this size. - const maxBatchSizeChars = 5 * 1024 * 1024; + const maxBatchSizeChars = 10 * 1024 * 1024; let batchToProcess = []; let currentBatchSize = 0; while (this.callQueue.length > 0 && currentBatchSize < maxBatchSizeChars) { - const item = this.callQueue[0]; + const item = this.callQueue.shift(); + if (item === undefined) { + throw new Error('Call queue is empty'); + } const itemSize = JSON.stringify(item).length; if (currentBatchSize + itemSize <= maxBatchSizeChars) { - batchToProcess.push(this.callQueue.shift()!); + batchToProcess.push(item); currentBatchSize += itemSize; } else { + // doesn't fit, put it back + this.callQueue.unshift(item); break; } } + if (batchToProcess.length === 0) { + this.batchProcessTimeout = null; + return; + } + + this.isBatchProcessing = true; + const batchReq = { batch: batchToProcess.map(item => ({ mode: item.mode, @@ -146,8 +161,20 @@ export class WeaveClient { ); } catch (error) { console.error('Error processing batch:', error); + this.errorCount++; + fs.appendFileSync( + '/tmp/weaveRequests.log', + `Error processing batch: ${error}\n` + ); + // Put failed items back at the front of the queue this.callQueue.unshift(...batchToProcess); + + // Exit if we have too many errors + if (this.errorCount > this.MAX_ERRORS) { + console.error(`Exceeded max errors: ${this.MAX_ERRORS}; exiting`); + process.exit(1); + } } finally { this.isBatchProcessing = false; this.batchProcessTimeout = null; @@ -734,7 +761,9 @@ function mergeSummaries(left: Summary, right: Summary): Summary { if (typeof leftValue === 'number' && typeof result[key] === 'number') { result[key] = leftValue + result[key]; } else if ( + leftValue != null && typeof leftValue === 'object' && + result[key] != null && typeof result[key] === 'object' ) { result[key] = mergeSummaries(leftValue, result[key]); From 2bd0debf427c083597a06542612615c28a83c008 Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Thu, 19 Dec 2024 20:28:32 -0500 Subject: [PATCH 2/6] test --- sdks/node/src/weaveClient.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/node/src/weaveClient.ts b/sdks/node/src/weaveClient.ts index 7cb1f5cba09b..c163d22cfa13 100644 --- a/sdks/node/src/weaveClient.ts +++ b/sdks/node/src/weaveClient.ts @@ -33,6 +33,8 @@ import {packageVersion} from './utils/userAgent'; import {WandbServerApi} from './wandb/wandbServerApi'; import {ObjectRef, WeaveObject, getClassChain} from './weaveObject'; +const WEAVE_ERRORS_LOG_FNAME = 'weaveErrors.log'; + export type CallStackEntry = { callId: string; traceId: string; @@ -129,7 +131,14 @@ export class WeaveClient { if (item === undefined) { throw new Error('Call queue is empty'); } + const itemSize = JSON.stringify(item).length; + if (itemSize > maxBatchSizeChars) { + fs.appendFileSync( + WEAVE_ERRORS_LOG_FNAME, + `Item size ${itemSize} exceeds max batch size ${maxBatchSizeChars}. Item: ${JSON.stringify(item)}\n` + ); + } if (currentBatchSize + itemSize <= maxBatchSizeChars) { batchToProcess.push(item); @@ -163,7 +172,7 @@ export class WeaveClient { console.error('Error processing batch:', error); this.errorCount++; fs.appendFileSync( - '/tmp/weaveRequests.log', + WEAVE_ERRORS_LOG_FNAME, `Error processing batch: ${error}\n` ); From 14eb937baeb41dbddd79789b472250e8ebaedbc2 Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Thu, 19 Dec 2024 22:51:40 -0500 Subject: [PATCH 3/6] test --- sdks/node/src/__tests__/weaveClient.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sdks/node/src/__tests__/weaveClient.test.ts b/sdks/node/src/__tests__/weaveClient.test.ts index 43dc69b1965c..6e4c28ab4dc7 100644 --- a/sdks/node/src/__tests__/weaveClient.test.ts +++ b/sdks/node/src/__tests__/weaveClient.test.ts @@ -121,6 +121,24 @@ describe('WeaveClient', () => { (client as any).BATCH_INTERVAL = 10; }); + it('should handle oversized batch items', async () => { + const largeData = { + mode: 'start', + data: {id: '1', payload: 'x'.repeat(11 * 1024 * 1024)}, + }; + const smallData = {mode: 'start', data: {id: '2', payload: 'small'}}; + + await (client as any).processBatch(); + + expect( + mockTraceServerApi.call.callStartBatchCallUpsertBatchPost + ).toHaveBeenCalledWith({ + batch: [{mode: 'start', req: smallData.data}], + }); + + expect((client as any).callQueue).toContain(largeData); + }); + it('should batch multiple calls together', async () => { // Add test calls to queue (client as any).callQueue.push( From a1cb29fc6e33c78b47a6b3fcdaadfa888fb20f53 Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Fri, 20 Dec 2024 00:14:46 -0500 Subject: [PATCH 4/6] test --- sdks/node/src/__tests__/weaveClient.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/node/src/__tests__/weaveClient.test.ts b/sdks/node/src/__tests__/weaveClient.test.ts index 6e4c28ab4dc7..391033318307 100644 --- a/sdks/node/src/__tests__/weaveClient.test.ts +++ b/sdks/node/src/__tests__/weaveClient.test.ts @@ -122,6 +122,10 @@ describe('WeaveClient', () => { }); it('should handle oversized batch items', async () => { + const mockApiCall = mockTraceServerApi.call + .callStartBatchCallUpsertBatchPost as jest.Mock; + mockApiCall.mockResolvedValue({}); + const largeData = { mode: 'start', data: {id: '1', payload: 'x'.repeat(11 * 1024 * 1024)}, From 6178a25ec61a9424518d8b17c098a1795a3036e4 Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Fri, 20 Dec 2024 00:23:06 -0500 Subject: [PATCH 5/6] test --- sdks/node/src/__tests__/weaveClient.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/node/src/__tests__/weaveClient.test.ts b/sdks/node/src/__tests__/weaveClient.test.ts index 391033318307..5a8d0c7232ba 100644 --- a/sdks/node/src/__tests__/weaveClient.test.ts +++ b/sdks/node/src/__tests__/weaveClient.test.ts @@ -132,6 +132,8 @@ describe('WeaveClient', () => { }; const smallData = {mode: 'start', data: {id: '2', payload: 'small'}}; + (client as any).callQueue.push(largeData, smallData); + await (client as any).processBatch(); expect( From 5aa16dda001e7678bffa5239983f410f7860abed Mon Sep 17 00:00:00 2001 From: Andrew Truong Date: Fri, 20 Dec 2024 15:18:05 -0500 Subject: [PATCH 6/6] test --- sdks/node/src/__tests__/weaveClient.test.ts | 18 +++++++----------- sdks/node/src/weaveClient.ts | 18 ++++++++++-------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/sdks/node/src/__tests__/weaveClient.test.ts b/sdks/node/src/__tests__/weaveClient.test.ts index 5a8d0c7232ba..531c16547b32 100644 --- a/sdks/node/src/__tests__/weaveClient.test.ts +++ b/sdks/node/src/__tests__/weaveClient.test.ts @@ -108,7 +108,7 @@ describe('WeaveClient', () => { beforeEach(() => { mockTraceServerApi = { call: { - callStartBatchCallUpsertBatchPost: jest.fn(), + callStartBatchCallUpsertBatchPost: jest.fn().mockResolvedValue({}), }, } as any; mockWandbServerApi = {} as any; @@ -122,17 +122,13 @@ describe('WeaveClient', () => { }); it('should handle oversized batch items', async () => { - const mockApiCall = mockTraceServerApi.call - .callStartBatchCallUpsertBatchPost as jest.Mock; - mockApiCall.mockResolvedValue({}); - - const largeData = { + const bigPayloadSize = 11 * 1024 * 1024; + const smallData = {mode: 'start', data: {id: '2', payload: 'small'}}; + const bigData = { mode: 'start', - data: {id: '1', payload: 'x'.repeat(11 * 1024 * 1024)}, + data: {id: '1', payload: 'x'.repeat(bigPayloadSize)}, }; - const smallData = {mode: 'start', data: {id: '2', payload: 'small'}}; - - (client as any).callQueue.push(largeData, smallData); + (client as any).callQueue.push(smallData, bigData); await (client as any).processBatch(); @@ -142,7 +138,7 @@ describe('WeaveClient', () => { batch: [{mode: 'start', req: smallData.data}], }); - expect((client as any).callQueue).toContain(largeData); + expect((client as any).callQueue).toContain(bigData); }); it('should batch multiple calls together', async () => { diff --git a/sdks/node/src/weaveClient.ts b/sdks/node/src/weaveClient.ts index c163d22cfa13..7604495a6be1 100644 --- a/sdks/node/src/weaveClient.ts +++ b/sdks/node/src/weaveClient.ts @@ -74,6 +74,9 @@ class CallStack { type CallStartParams = StartedCallSchemaForInsert; type CallEndParams = EndedCallSchemaForInsert; +// We count characters item by item, and try to limit batches to about this size. +const MAX_BATCH_SIZE_CHARS = 10 * 1024 * 1024; + export class WeaveClient { private stackContext = new AsyncLocalStorage(); private callQueue: Array<{mode: 'start' | 'end'; data: any}> = []; @@ -119,28 +122,27 @@ export class WeaveClient { this.isBatchProcessing = true; - // We count characters item by item, and try to limit batches to about - // this size. - const maxBatchSizeChars = 10 * 1024 * 1024; - let batchToProcess = []; let currentBatchSize = 0; - while (this.callQueue.length > 0 && currentBatchSize < maxBatchSizeChars) { + while ( + this.callQueue.length > 0 && + currentBatchSize < MAX_BATCH_SIZE_CHARS + ) { const item = this.callQueue.shift(); if (item === undefined) { throw new Error('Call queue is empty'); } const itemSize = JSON.stringify(item).length; - if (itemSize > maxBatchSizeChars) { + if (itemSize > MAX_BATCH_SIZE_CHARS) { fs.appendFileSync( WEAVE_ERRORS_LOG_FNAME, - `Item size ${itemSize} exceeds max batch size ${maxBatchSizeChars}. Item: ${JSON.stringify(item)}\n` + `Item size ${itemSize} exceeds max batch size ${MAX_BATCH_SIZE_CHARS}. Item: ${JSON.stringify(item)}\n` ); } - if (currentBatchSize + itemSize <= maxBatchSizeChars) { + if (currentBatchSize + itemSize <= MAX_BATCH_SIZE_CHARS) { batchToProcess.push(item); currentBatchSize += itemSize; } else {