Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: tracing using OpenTelemetry. #2085

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .idea/runConfigurations/System_Test.xml
ehsannas marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 59 additions & 47 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import {StatusCode} from './status-code';
// eslint-disable-next-line no-undef
import GrpcStatus = FirebaseFirestore.GrpcStatus;
import api = google.firestore.v1;
import {
ATTRIBUTE_KEY_DOC_COUNT,
SPAN_NAME_BULK_WRITER_COMMIT,
} from './telemetry/trace-util';

/*!
* The maximum number of writes that can be in a single batch.
Expand Down Expand Up @@ -243,55 +247,63 @@ class BulkCommitBatch extends WriteBatch {
}

async bulkCommit(options: {requestTag?: string} = {}): Promise<void> {
const tag = options?.requestTag ?? requestTag();

// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

let response: api.IBatchWriteResponse;
try {
logger(
'BulkCommitBatch.bulkCommit',
tag,
`Sending next batch with ${this._opCount} writes`
);
const retryCodes = getRetryCodes('batchWrite');
response = await this._commit<
api.BatchWriteRequest,
api.BatchWriteResponse
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
} catch (err) {
// Map the failure to each individual write's result.
const ops = Array.from({length: this.pendingOps.length});
response = {
writeResults: ops.map(() => {
return {};
}),
status: ops.map(() => err),
};
}

for (let i = 0; i < (response.writeResults || []).length; ++i) {
// Since delete operations currently do not have write times, use a
// sentinel Timestamp value.
// TODO(b/158502664): Use actual delete timestamp.
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);

const status = (response.status || [])[i];
if (status.code === StatusCode.OK) {
const updateTime = Timestamp.fromProto(
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
status.message || undefined
return this._firestore._traceUtil.startActiveSpan(
SPAN_NAME_BULK_WRITER_COMMIT,
async () => {
const tag = options?.requestTag ?? requestTag();

// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

let response: api.IBatchWriteResponse;
try {
logger(
'BulkCommitBatch.bulkCommit',
tag,
`Sending next batch with ${this._opCount} writes`
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
const retryCodes = getRetryCodes('batchWrite');
response = await this._commit<
api.BatchWriteRequest,
api.BatchWriteResponse
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
} catch (err) {
// Map the failure to each individual write's result.
const ops = Array.from({length: this.pendingOps.length});
response = {
writeResults: ops.map(() => {
return {};
}),
status: ops.map(() => err),
};
}

for (let i = 0; i < (response.writeResults || []).length; ++i) {
// Since delete operations currently do not have write times, use a
// sentinel Timestamp value.
// TODO(b/158502664): Use actual delete timestamp.
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);

const status = (response.status || [])[i];
if (status.code === StatusCode.OK) {
const updateTime = Timestamp.fromProto(
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
status.message || undefined
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
}
}
},
{
[ATTRIBUTE_KEY_DOC_COUNT]: this._opCount,
}
}
);
}

/**
Expand Down
80 changes: 43 additions & 37 deletions dev/src/collection-group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {validateInteger} from './validate';
import api = protos.google.firestore.v1;
import {defaultConverter} from './types';
import {compareArrays} from './order';
import {SPAN_NAME_PARTITION_QUERY} from './telemetry/trace-util';

/**
* A `CollectionGroup` refers to all documents that are contained in a
Expand Down Expand Up @@ -81,48 +82,53 @@ export class CollectionGroup<
async *getPartitions(
desiredPartitionCount: number
): AsyncIterable<QueryPartition<AppModelType, DbModelType>> {
validateInteger('desiredPartitionCount', desiredPartitionCount, {
minValue: 1,
});

const tag = requestTag();
await this.firestore.initializeIfNeeded(tag);

const partitions: Array<api.IValue>[] = [];

if (desiredPartitionCount > 1) {
// Partition queries require explicit ordering by __name__.
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
const request: api.IPartitionQueryRequest =
queryWithDefaultOrder.toProto();

// Since we are always returning an extra partition (with an empty endBefore
// cursor), we reduce the desired partition count by one.
request.partitionCount = desiredPartitionCount - 1;

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
stream.resume();

for await (const currentCursor of stream) {
partitions.push(currentCursor.values ?? []);
await this._firestore._traceUtil.startActiveSpan(
SPAN_NAME_PARTITION_QUERY,
async () => {
validateInteger('desiredPartitionCount', desiredPartitionCount, {
minValue: 1,
});

const tag = requestTag();
await this.firestore.initializeIfNeeded(tag);

if (desiredPartitionCount > 1) {
// Partition queries require explicit ordering by __name__.
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
const request: api.IPartitionQueryRequest =
queryWithDefaultOrder.toProto();

// Since we are always returning an extra partition (with an empty endBefore
// cursor), we reduce the desired partition count by one.
request.partitionCount = desiredPartitionCount - 1;

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
stream.resume();

for await (const currentCursor of stream) {
partitions.push(currentCursor.values ?? []);
}
}

logger(
'Firestore.getPartitions',
tag,
'Received %d partitions',
partitions.length
);

// Sort the partitions as they may not be ordered if responses are paged.
partitions.sort((l, r) => compareArrays(l, r));
}
}

logger(
'Firestore.getPartitions',
tag,
'Received %d partitions',
partitions.length
);

// Sort the partitions as they may not be ordered if responses are paged.
partitions.sort((l, r) => compareArrays(l, r));

for (let i = 0; i < partitions.length; ++i) {
milaGGL marked this conversation as resolved.
Show resolved Hide resolved
yield new QueryPartition(
this._firestore,
Expand Down
Loading
Loading