Skip to content

Commit

Permalink
Remove enableFlightReadableStream (#31766)
Browse files Browse the repository at this point in the history
Base: #31765

Landed everywhere
  • Loading branch information
rickhanlonii authored Dec 13, 2024
1 parent 4996a8f commit 1520802
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 163 deletions.
93 changes: 37 additions & 56 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';

import {
enablePostpone,
enableFlightReadableStream,
enableOwnerStacks,
enableServerComponentLogs,
enableProfilerTimer,
Expand Down Expand Up @@ -406,14 +405,12 @@ function wakeChunkIfInitialized<T>(

function triggerErrorOnChunk<T>(chunk: SomeChunk<T>, error: mixed): void {
if (chunk.status !== PENDING && chunk.status !== BLOCKED) {
if (enableFlightReadableStream) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
// $FlowFixMe[incompatible-call]: The error method should accept mixed.
controller.error(error);
}
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
// $FlowFixMe[incompatible-call]: The error method should accept mixed.
controller.error(error);
return;
}
const listeners = chunk.reason;
Expand Down Expand Up @@ -512,13 +509,11 @@ function resolveModelChunk<T>(
value: UninitializedModel,
): void {
if (chunk.status !== PENDING) {
if (enableFlightReadableStream) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueModel(value);
}
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueModel(value);
return;
}
const resolveListeners = chunk.value;
Expand Down Expand Up @@ -1718,16 +1713,14 @@ function resolveModel(

function resolveText(response: Response, id: number, text: string): void {
const chunks = response._chunks;
if (enableFlightReadableStream) {
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(text);
return;
}
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(text);
return;
}
chunks.set(id, createInitializedTextChunk(response, text));
}
Expand All @@ -1738,16 +1731,14 @@ function resolveBuffer(
buffer: $ArrayBufferView | ArrayBuffer,
): void {
const chunks = response._chunks;
if (enableFlightReadableStream) {
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(buffer);
return;
}
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(buffer);
return;
}
chunks.set(id, createInitializedBufferChunk(response, buffer));
}
Expand Down Expand Up @@ -2967,38 +2958,28 @@ function processFullStringRow(
);
}
case 82 /* "R" */: {
if (enableFlightReadableStream) {
startReadableStream(response, id, undefined);
return;
}
startReadableStream(response, id, undefined);
return;
}
// Fallthrough
case 114 /* "r" */: {
if (enableFlightReadableStream) {
startReadableStream(response, id, 'bytes');
return;
}
startReadableStream(response, id, 'bytes');
return;
}
// Fallthrough
case 88 /* "X" */: {
if (enableFlightReadableStream) {
startAsyncIterable(response, id, false);
return;
}
startAsyncIterable(response, id, false);
return;
}
// Fallthrough
case 120 /* "x" */: {
if (enableFlightReadableStream) {
startAsyncIterable(response, id, true);
return;
}
startAsyncIterable(response, id, true);
return;
}
// Fallthrough
case 67 /* "C" */: {
if (enableFlightReadableStream) {
stopStream(response, id, row);
return;
}
stopStream(response, id, row);
return;
}
// Fallthrough
case 80 /* "P" */: {
Expand Down
37 changes: 16 additions & 21 deletions packages/react-client/src/ReactFlightReplyClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import type {
import type {LazyComponent} from 'react/src/ReactLazy';
import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';

import {
enableRenderableContext,
enableFlightReadableStream,
} from 'shared/ReactFeatureFlags';
import {enableRenderableContext} from 'shared/ReactFeatureFlags';

import {
REACT_ELEMENT_TYPE,
Expand Down Expand Up @@ -663,23 +660,21 @@ export function processReply(
return Array.from((iterator: any));
}

if (enableFlightReadableStream) {
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}

// Verify that this is a simple plain object.
Expand Down
11 changes: 5 additions & 6 deletions packages/react-client/src/__tests__/ReactFlight-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2623,7 +2623,7 @@ describe('ReactFlight', () => {
);
});

// @gate enableFlightReadableStream && enableAsyncIterableChildren
// @gate enableAsyncIterableChildren
it('shares state when moving keyed Server Components that render async iterables', async () => {
function StatefulClient({name, initial}) {
const [state] = React.useState(initial);
Expand Down Expand Up @@ -2814,7 +2814,7 @@ describe('ReactFlight', () => {
);
});

// @gate enableFlightReadableStream && enableAsyncIterableChildren
// @gate enableAsyncIterableChildren
it('preserves debug info for server-to-server pass through of async iterables', async () => {
let resolve;
const iteratorPromise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2849,10 +2849,9 @@ describe('ReactFlight', () => {
},
);

if (gate(flag => flag.enableFlightReadableStream)) {
// Wait for the iterator to finish
await iteratorPromise;
}
// Wait for the iterator to finish
await iteratorPromise;

await 0; // One more tick for the return value / closing.

const transport = ReactNoopFlightServer.render(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,6 @@ describe('ReactFlightDOMBrowser', () => {
});
}

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
const errors = [];
let controller1;
Expand Down Expand Up @@ -2161,7 +2160,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(errors).toEqual(['rejected']);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we are cancelled', async () => {
let controller;
let cancelReason;
Expand Down Expand Up @@ -2194,7 +2192,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(loggedReason).toBe(reason);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we abort', async () => {
const errors = [];
let controller;
Expand Down Expand Up @@ -2252,7 +2249,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(errors).toEqual([reason]);
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2369,7 +2365,6 @@ describe('ReactFlightDOMBrowser', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying AsyncIterable when we are cancelled', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2408,7 +2403,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(loggedReason).toBe(reason);
});

// @gate enableFlightReadableStream
it('should cancels the underlying AsyncIterable when we abort', async () => {
const errors = [];
const abortController = new AbortController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ describe('ReactFlightDOMEdge', () => {
expect(result.get('value')).toBe('hello');
});

// @gate enableFlightReadableStream
it('can pass an async import to a ReadableStream while enqueuing in order', async () => {
let resolve;
const promise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -690,7 +689,6 @@ describe('ReactFlightDOMEdge', () => {
expect(await reader.read()).toEqual({value: undefined, done: true});
});

// @gate enableFlightReadableStream
it('can pass an async import a AsyncIterable while allowing peaking at future values', async () => {
let resolve;
const promise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -742,7 +740,6 @@ describe('ReactFlightDOMEdge', () => {
expect(await iterator.next()).toEqual({value: undefined, done: true});
});

// @gate enableFlightReadableStream
it('should ideally dedupe objects inside async iterables but does not yet', async () => {
const obj = {
this: {is: 'a large objected'},
Expand Down Expand Up @@ -820,7 +817,6 @@ describe('ReactFlightDOMEdge', () => {
);
});

// @gate enableFlightReadableStream
it('should supports ReadableStreams with typed arrays', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down Expand Up @@ -879,7 +875,6 @@ describe('ReactFlightDOMEdge', () => {
expect(streamedBuffers).toEqual(buffers);
});

// @gate enableFlightReadableStream
it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ describe('ReactFlightDOMNode', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we are cancelled', async () => {
let controller;
let cancelReason;
Expand Down Expand Up @@ -336,7 +335,6 @@ describe('ReactFlightDOMNode', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we abort', async () => {
const errors = [];
let controller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ describe('ReactFlightDOMReply', () => {
expect(response.obj).toBe(obj);
});

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
let controller1;
let controller2;
Expand Down Expand Up @@ -511,7 +510,6 @@ describe('ReactFlightDOMReply', () => {
});
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ describe('ReactFlightDOMReplyEdge', () => {
expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer());
});

// @gate enableFlightReadableStream
it('should supports ReadableStreams with typed arrays', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down Expand Up @@ -194,7 +193,6 @@ describe('ReactFlightDOMReplyEdge', () => {
expect(streamedBuffers).toEqual(buffers);
});

// @gate enableFlightReadableStream
it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down
Loading

0 comments on commit 1520802

Please sign in to comment.