1. /**
    
  2.  * Copyright (c) Meta Platforms, Inc. and affiliates.
    
  3.  *
    
  4.  * This source code is licensed under the MIT license found in the
    
  5.  * LICENSE file in the root directory of this source tree.
    
  6.  *
    
  7.  * @flow
    
  8.  */
    
  9. 
    
  10. import type {
    
  11.   Request,
    
  12.   ReactClientValue,
    
  13. } from 'react-server/src/ReactFlightServer';
    
  14. import type {Destination} from 'react-server/src/ReactServerStreamConfigNode';
    
  15. import type {ClientManifest} from './ReactFlightServerConfigWebpackBundler';
    
  16. import type {ServerManifest} from 'react-client/src/ReactFlightClientConfig';
    
  17. import type {Busboy} from 'busboy';
    
  18. import type {Writable} from 'stream';
    
  19. import type {ServerContextJSONValue, Thenable} from 'shared/ReactTypes';
    
  20. 
    
  21. import {
    
  22.   createRequest,
    
  23.   startWork,
    
  24.   startFlowing,
    
  25.   stopFlowing,
    
  26.   abort,
    
  27. } from 'react-server/src/ReactFlightServer';
    
  28. 
    
  29. import {
    
  30.   createResponse,
    
  31.   reportGlobalError,
    
  32.   close,
    
  33.   resolveField,
    
  34.   resolveFileInfo,
    
  35.   resolveFileChunk,
    
  36.   resolveFileComplete,
    
  37.   getRoot,
    
  38. } from 'react-server/src/ReactFlightReplyServer';
    
  39. 
    
  40. import {
    
  41.   decodeAction,
    
  42.   decodeFormState,
    
  43. } from 'react-server/src/ReactFlightActionServer';
    
  44. 
    
  45. export {
    
  46.   registerServerReference,
    
  47.   registerClientReference,
    
  48.   createClientModuleProxy,
    
  49. } from './ReactFlightWebpackReferences';
    
  50. 
    
  51. function createDrainHandler(destination: Destination, request: Request) {
    
  52.   return () => startFlowing(request, destination);
    
  53. }
    
  54. 
    
  55. function createCancelHandler(request: Request, reason: string) {
    
  56.   return () => {
    
  57.     stopFlowing(request);
    
  58.     // eslint-disable-next-line react-internal/prod-error-codes
    
  59.     abort(request, new Error(reason));
    
  60.   };
    
  61. }
    
  62. 
    
  63. type Options = {
    
  64.   onError?: (error: mixed) => void,
    
  65.   onPostpone?: (reason: string) => void,
    
  66.   context?: Array<[string, ServerContextJSONValue]>,
    
  67.   identifierPrefix?: string,
    
  68. };
    
  69. 
    
  70. type PipeableStream = {
    
  71.   abort(reason: mixed): void,
    
  72.   pipe<T: Writable>(destination: T): T,
    
  73. };
    
  74. 
    
  75. function renderToPipeableStream(
    
  76.   model: ReactClientValue,
    
  77.   webpackMap: ClientManifest,
    
  78.   options?: Options,
    
  79. ): PipeableStream {
    
  80.   const request = createRequest(
    
  81.     model,
    
  82.     webpackMap,
    
  83.     options ? options.onError : undefined,
    
  84.     options ? options.context : undefined,
    
  85.     options ? options.identifierPrefix : undefined,
    
  86.     options ? options.onPostpone : undefined,
    
  87.   );
    
  88.   let hasStartedFlowing = false;
    
  89.   startWork(request);
    
  90.   return {
    
  91.     pipe<T: Writable>(destination: T): T {
    
  92.       if (hasStartedFlowing) {
    
  93.         throw new Error(
    
  94.           'React currently only supports piping to one writable stream.',
    
  95.         );
    
  96.       }
    
  97.       hasStartedFlowing = true;
    
  98.       startFlowing(request, destination);
    
  99.       destination.on('drain', createDrainHandler(destination, request));
    
  100.       destination.on(
    
  101.         'error',
    
  102.         createCancelHandler(
    
  103.           request,
    
  104.           'The destination stream errored while writing data.',
    
  105.         ),
    
  106.       );
    
  107.       destination.on(
    
  108.         'close',
    
  109.         createCancelHandler(request, 'The destination stream closed early.'),
    
  110.       );
    
  111.       return destination;
    
  112.     },
    
  113.     abort(reason: mixed) {
    
  114.       abort(request, reason);
    
  115.     },
    
  116.   };
    
  117. }
    
  118. 
    
  119. function decodeReplyFromBusboy<T>(
    
  120.   busboyStream: Busboy,
    
  121.   webpackMap: ServerManifest,
    
  122. ): Thenable<T> {
    
  123.   const response = createResponse(webpackMap, '');
    
  124.   let pendingFiles = 0;
    
  125.   const queuedFields: Array<string> = [];
    
  126.   busboyStream.on('field', (name, value) => {
    
  127.     if (pendingFiles > 0) {
    
  128.       // Because the 'end' event fires two microtasks after the next 'field'
    
  129.       // we would resolve files and fields out of order. To handle this properly
    
  130.       // we queue any fields we receive until the previous file is done.
    
  131.       queuedFields.push(name, value);
    
  132.     } else {
    
  133.       resolveField(response, name, value);
    
  134.     }
    
  135.   });
    
  136.   busboyStream.on('file', (name, value, {filename, encoding, mimeType}) => {
    
  137.     if (encoding.toLowerCase() === 'base64') {
    
  138.       throw new Error(
    
  139.         "React doesn't accept base64 encoded file uploads because we don't expect " +
    
  140.           "form data passed from a browser to ever encode data that way. If that's " +
    
  141.           'the wrong assumption, we can easily fix it.',
    
  142.       );
    
  143.     }
    
  144.     pendingFiles++;
    
  145.     const file = resolveFileInfo(response, name, filename, mimeType);
    
  146.     value.on('data', chunk => {
    
  147.       resolveFileChunk(response, file, chunk);
    
  148.     });
    
  149.     value.on('end', () => {
    
  150.       resolveFileComplete(response, name, file);
    
  151.       pendingFiles--;
    
  152.       if (pendingFiles === 0) {
    
  153.         // Release any queued fields
    
  154.         for (let i = 0; i < queuedFields.length; i += 2) {
    
  155.           resolveField(response, queuedFields[i], queuedFields[i + 1]);
    
  156.         }
    
  157.         queuedFields.length = 0;
    
  158.       }
    
  159.     });
    
  160.   });
    
  161.   busboyStream.on('finish', () => {
    
  162.     close(response);
    
  163.   });
    
  164.   busboyStream.on('error', err => {
    
  165.     reportGlobalError(
    
  166.       response,
    
  167.       // $FlowFixMe[incompatible-call] types Error and mixed are incompatible
    
  168.       err,
    
  169.     );
    
  170.   });
    
  171.   return getRoot(response);
    
  172. }
    
  173. 
    
  174. function decodeReply<T>(
    
  175.   body: string | FormData,
    
  176.   webpackMap: ServerManifest,
    
  177. ): Thenable<T> {
    
  178.   if (typeof body === 'string') {
    
  179.     const form = new FormData();
    
  180.     form.append('0', body);
    
  181.     body = form;
    
  182.   }
    
  183.   const response = createResponse(webpackMap, '', body);
    
  184.   const root = getRoot<T>(response);
    
  185.   close(response);
    
  186.   return root;
    
  187. }
    
  188. 
    
  189. export {
    
  190.   renderToPipeableStream,
    
  191.   decodeReplyFromBusboy,
    
  192.   decodeReply,
    
  193.   decodeAction,
    
  194.   decodeFormState,
    
  195. };