diff --git a/react_on_rails_pro/lib/react_on_rails_pro/request.rb b/react_on_rails_pro/lib/react_on_rails_pro/request.rb index de5ab15658..59bffd4edd 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/request.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/request.rb @@ -28,7 +28,12 @@ def render_code_as_stream(path, js_code, is_rsc_payload:) end ReactOnRailsPro::StreamRequest.create do |send_bundle| - form = form_with_code(js_code, send_bundle) + if send_bundle + Rails.logger.info { "[ReactOnRailsPro] Sending bundle to the node renderer" } + upload_assets + end + + form = form_with_code(js_code, false) perform_request(path, form: form, stream: true) end end diff --git a/react_on_rails_pro/lib/react_on_rails_pro/server_rendering_js_code.rb b/react_on_rails_pro/lib/react_on_rails_pro/server_rendering_js_code.rb index 806bbcfd1a..cb37adabe1 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/server_rendering_js_code.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/server_rendering_js_code.rb @@ -36,6 +36,7 @@ def generate_rsc_payload_js_function(render_options) renderingRequest, rscBundleHash: '#{ReactOnRailsPro::Utils.rsc_bundle_hash}', } + const runOnOtherBundle = globalThis.runOnOtherBundle; if (typeof generateRSCPayload !== 'function') { globalThis.generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) { const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters; diff --git a/react_on_rails_pro/packages/node-renderer/src/shared/utils.ts b/react_on_rails_pro/packages/node-renderer/src/shared/utils.ts index 4f6babc05f..117a84c9e0 100644 --- a/react_on_rails_pro/packages/node-renderer/src/shared/utils.ts +++ b/react_on_rails_pro/packages/node-renderer/src/shared/utils.ts @@ -8,6 +8,7 @@ import * as errorReporter from './errorReporter'; import { getConfig } from './configBuilder'; import log from './log'; import type { RenderResult } from '../worker/vm'; +import fileExistsAsync from './fileExistsAsync'; export const TRUNCATION_FILLER = '\n... TRUNCATED ...\n'; @@ -168,3 +169,29 @@ export function getAssetPath(bundleTimestamp: string | number, filename: string) const bundleDirectory = getBundleDirectory(bundleTimestamp); return path.join(bundleDirectory, filename); } + +export async function validateBundlesExist( + bundleTimestamp: string | number, + dependencyBundleTimestamps?: (string | number)[], +): Promise { + const missingBundles = ( + await Promise.all( + [...(dependencyBundleTimestamps ?? []), bundleTimestamp].map(async (timestamp) => { + const bundleFilePath = getRequestBundleFilePath(timestamp); + const fileExists = await fileExistsAsync(bundleFilePath); + return fileExists ? null : timestamp; + }), + ) + ).filter((timestamp) => timestamp !== null); + + if (missingBundles.length > 0) { + const missingBundlesText = missingBundles.length > 1 ? 'bundles' : 'bundle'; + log.info(`No saved ${missingBundlesText}: ${missingBundles.join(', ')}`); + return { + headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, + status: 410, + data: 'No bundle uploaded', + }; + } + return null; +} diff --git a/react_on_rails_pro/packages/node-renderer/src/worker.ts b/react_on_rails_pro/packages/node-renderer/src/worker.ts index 3a7edb65a4..d157c4c764 100644 --- a/react_on_rails_pro/packages/node-renderer/src/worker.ts +++ b/react_on_rails_pro/packages/node-renderer/src/worker.ts @@ -13,10 +13,20 @@ import log, { sharedLoggerOptions } from './shared/log'; import packageJson from './shared/packageJson'; import { buildConfig, Config, getConfig } from './shared/configBuilder'; import fileExistsAsync from './shared/fileExistsAsync'; -import type { FastifyInstance, FastifyReply, FastifyRequest } from './worker/types'; -import checkProtocolVersion from './worker/checkProtocolVersionHandler'; -import authenticate from './worker/authHandler'; -import { handleRenderRequest, type ProvidedNewBundle } from './worker/handleRenderRequest'; +import type { FastifyInstance, FastifyReply } from './worker/types'; +import { performRequestPrechecks } from './worker/requestPrechecks'; +import { AuthBody, authenticate } from './worker/authHandler'; +import { + handleRenderRequest, + type ProvidedNewBundle, + handleNewBundlesProvided, +} from './worker/handleRenderRequest'; +import { + handleIncrementalRenderRequest, + type IncrementalRenderInitialRequest, + type IncrementalRenderSink, +} from './worker/handleIncrementalRenderRequest'; +import { handleIncrementalRenderStream } from './worker/handleIncrementalRenderStream'; import { errorResponseResult, formatExceptionMessage, @@ -160,41 +170,11 @@ export default function run(config: Partial) { }, }); - const isProtocolVersionMatch = async (req: FastifyRequest, res: FastifyReply) => { - // Check protocol version - const protocolVersionCheckingResult = checkProtocolVersion(req); - - if (typeof protocolVersionCheckingResult === 'object') { - await setResponse(protocolVersionCheckingResult, res); - return false; - } - - return true; - }; - - const isAuthenticated = async (req: FastifyRequest, res: FastifyReply) => { - // Authenticate Ruby client - const authResult = authenticate(req); - - if (typeof authResult === 'object') { - await setResponse(authResult, res); - return false; - } - - return true; - }; - - const requestPrechecks = async (req: FastifyRequest, res: FastifyReply) => { - if (!(await isProtocolVersionMatch(req, res))) { - return false; - } - - if (!(await isAuthenticated(req, res))) { - return false; - } - - return true; - }; + // Ensure NDJSON bodies are not buffered and are available as a stream immediately + app.addContentTypeParser('application/x-ndjson', (req, payload, done) => { + // Pass through the raw stream; the route will consume req.raw + done(null, payload); + }); // See https://github.com/shakacode/react_on_rails_pro/issues/119 for why // the digest is part of the request URL. Yes, it's not used here, but the @@ -209,7 +189,9 @@ export default function run(config: Partial) { // Can't infer from the route like Express can Params: { bundleTimestamp: string; renderRequestDigest: string }; }>('/bundles/:bundleTimestamp/render/:renderRequestDigest', async (req, res) => { - if (!(await requestPrechecks(req, res))) { + const precheckResult = performRequestPrechecks(req.body); + if (precheckResult) { + await setResponse(precheckResult, res); return; } @@ -251,7 +233,7 @@ export default function run(config: Partial) { providedNewBundles, assetsToCopy, }); - await setResponse(result, res); + await setResponse(result.response, res); } catch (err) { const exceptionMessage = formatExceptionMessage( renderingRequest, @@ -269,17 +251,124 @@ export default function run(config: Partial) { } }); + // Streaming NDJSON incremental render endpoint + app.post<{ + Params: { bundleTimestamp: string; renderRequestDigest: string }; + }>('/bundles/:bundleTimestamp/incremental-render/:renderRequestDigest', async (req, res) => { + const { bundleTimestamp } = req.params; + + // Stream parser state + let incrementalSink: IncrementalRenderSink | undefined; + + try { + // Handle the incremental render stream + await handleIncrementalRenderStream({ + request: req, + onRenderRequestReceived: async (obj: unknown) => { + // Build a temporary FastifyRequest shape for protocol/auth check + const tempReqBody = typeof obj === 'object' && obj !== null ? (obj as Record) : {}; + + // Perform request prechecks + const precheckResult = performRequestPrechecks(tempReqBody); + if (precheckResult) { + return { + response: precheckResult, + shouldContinue: false, + }; + } + + // Extract data for incremental render request + const dependencyBundleTimestamps = extractBodyArrayField( + tempReqBody as WithBodyArrayField, 'dependencyBundleTimestamps'>, + 'dependencyBundleTimestamps', + ); + + const initial: IncrementalRenderInitialRequest = { + renderingRequest: String((tempReqBody as { renderingRequest?: string }).renderingRequest ?? ''), + bundleTimestamp, + dependencyBundleTimestamps, + }; + + try { + const { response, sink } = await handleIncrementalRenderRequest(initial); + incrementalSink = sink; + + return { + response, + shouldContinue: !!incrementalSink, + }; + } catch (err) { + const errorResponse = errorResponseResult( + formatExceptionMessage( + 'IncrementalRender', + err, + 'Error while handling incremental render request', + ), + ); + return { + response: errorResponse, + shouldContinue: false, + }; + } + }, + + onUpdateReceived: (obj: unknown) => { + if (!incrementalSink) { + log.error({ msg: 'Unexpected update chunk received after rendering was aborted', obj }); + return; + } + + try { + incrementalSink.add(obj); + } catch (err) { + // Log error but don't stop processing + log.error({ err, msg: 'Error processing update chunk' }); + } + }, + + onResponseStart: async (response: ResponseResult) => { + await setResponse(response, res); + }, + + onRequestEnded: () => { + // Do nothing + }, + }); + } catch (err) { + // If an error occurred during stream processing, send error response + const errorResponse = errorResponseResult( + formatExceptionMessage('IncrementalRender', err, 'Error while processing incremental render stream'), + ); + await setResponse(errorResponse, res); + } + }); + // There can be additional files that might be required at the runtime. // Since the remote renderer doesn't contain any assets, they must be uploaded manually. app.post<{ Body: WithBodyArrayField, 'targetBundles'>; }>('/upload-assets', async (req, res) => { - if (!(await requestPrechecks(req, res))) { + const precheckResult = performRequestPrechecks(req.body); + if (precheckResult) { + await setResponse(precheckResult, res); return; } let lockAcquired = false; let lockfileName: string | undefined; - const assets: Asset[] = Object.values(req.body).filter(isAsset); + const assets: Asset[] = []; + + // Extract bundles that start with 'bundle_' prefix + const bundles: Array<{ timestamp: string; bundle: Asset }> = []; + Object.entries(req.body).forEach(([key, value]) => { + if (isAsset(value)) { + if (key.startsWith('bundle_')) { + const timestamp = key.replace('bundle_', ''); + bundles.push({ timestamp, bundle: value }); + } else { + assets.push(value); + } + } + }); // Handle targetBundles as either a string or an array const targetBundles = extractBodyArrayField(req.body, 'targetBundles'); @@ -291,7 +380,9 @@ export default function run(config: Partial) { } const assetsDescription = JSON.stringify(assets.map((asset) => asset.filename)); - const taskDescription = `Uploading files ${assetsDescription} to bundle directories: ${targetBundles.join(', ')}`; + const bundlesDescription = + bundles.length > 0 ? ` and bundles ${JSON.stringify(bundles.map((b) => b.bundle.filename))}` : ''; + const taskDescription = `Uploading files ${assetsDescription}${bundlesDescription} to bundle directories: ${targetBundles.join(', ')}`; try { const { lockfileName: name, wasLockAcquired, errorMessage } = await lock('transferring-assets'); @@ -330,7 +421,24 @@ export default function run(config: Partial) { await Promise.all(assetCopyPromises); - // Delete assets from uploads directory + // Handle bundles using the existing logic from handleRenderRequest + if (bundles.length > 0) { + const providedNewBundles = bundles.map(({ timestamp, bundle }) => ({ + timestamp, + bundle, + })); + + // Use the existing bundle handling logic + // Note: handleNewBundlesProvided will handle deleting the uploaded bundle files + // Pass null for assetsToCopy since we handle assets separately in this endpoint + const bundleResult = await handleNewBundlesProvided('upload-assets', providedNewBundles, null); + if (bundleResult) { + await setResponse(bundleResult, res); + return; + } + } + + // Delete assets from uploads directory (bundles are already handled by handleNewBundlesProvided) await deleteUploadedAssets(assets); await setResponse( @@ -341,7 +449,7 @@ export default function run(config: Partial) { res, ); } catch (err) { - const msg = 'ERROR when trying to copy assets'; + const msg = 'ERROR when trying to copy assets and bundles'; const message = `${msg}. ${err}. Task: ${taskDescription}`; log.error({ msg, @@ -373,7 +481,9 @@ export default function run(config: Partial) { Querystring: { filename: string }; Body: WithBodyArrayField, 'targetBundles'>; }>('/asset-exists', async (req, res) => { - if (!(await isAuthenticated(req, res))) { + const authResult = authenticate(req.body as AuthBody); + if (authResult) { + await setResponse(authResult, res); return; } diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/authHandler.ts b/react_on_rails_pro/packages/node-renderer/src/worker/authHandler.ts index b8f39d1e37..6c675136c7 100644 --- a/react_on_rails_pro/packages/node-renderer/src/worker/authHandler.ts +++ b/react_on_rails_pro/packages/node-renderer/src/worker/authHandler.ts @@ -6,13 +6,16 @@ */ // TODO: Replace with fastify-basic-auth per https://github.com/shakacode/react_on_rails_pro/issues/110 -import type { FastifyRequest } from './types'; import { getConfig } from '../shared/configBuilder'; -export = function authenticate(req: FastifyRequest) { +export interface AuthBody { + password?: string; +} + +export function authenticate(body: AuthBody) { const { password } = getConfig(); - if (password && password !== (req.body as { password?: string }).password) { + if (password && password !== body.password) { return { headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, status: 401, @@ -21,4 +24,4 @@ export = function authenticate(req: FastifyRequest) { } return undefined; -}; +} diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/checkProtocolVersionHandler.ts b/react_on_rails_pro/packages/node-renderer/src/worker/checkProtocolVersionHandler.ts index b1f0f3b3ca..65bbc77533 100644 --- a/react_on_rails_pro/packages/node-renderer/src/worker/checkProtocolVersionHandler.ts +++ b/react_on_rails_pro/packages/node-renderer/src/worker/checkProtocolVersionHandler.ts @@ -2,11 +2,14 @@ * Logic for checking protocol version. * @module worker/checkProtocVersionHandler */ -import type { FastifyRequest } from './types'; import packageJson from '../shared/packageJson'; -export = function checkProtocolVersion(req: FastifyRequest) { - const reqProtocolVersion = (req.body as { protocolVersion?: string }).protocolVersion; +export interface ProtocolVersionBody { + protocolVersion?: string; +} + +export function checkProtocolVersion(body: ProtocolVersionBody) { + const reqProtocolVersion = body.protocolVersion; if (reqProtocolVersion !== packageJson.protocolVersion) { return { headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, @@ -14,11 +17,11 @@ export = function checkProtocolVersion(req: FastifyRequest) { data: `Unsupported renderer protocol version ${ reqProtocolVersion ? `request protocol ${reqProtocolVersion}` - : `MISSING with body ${JSON.stringify(req.body)}` + : `MISSING with body ${JSON.stringify(body)}` } does not match installed renderer protocol ${packageJson.protocolVersion} for version ${packageJson.version}. Update either the renderer or the Rails server`, }; } return undefined; -}; +} diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderRequest.ts b/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderRequest.ts new file mode 100644 index 0000000000..c15f85fbff --- /dev/null +++ b/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderRequest.ts @@ -0,0 +1,98 @@ +import type { ResponseResult } from '../shared/utils'; +import { handleRenderRequest } from './handleRenderRequest'; +import log from '../shared/log'; +import { getRequestBundleFilePath } from '../shared/utils'; + +export type IncrementalRenderSink = { + /** Called for every subsequent NDJSON object after the first one */ + add: (chunk: unknown) => void; +}; + +export type UpdateChunk = { + bundleTimestamp: string | number; + updateChunk: string; +}; + +function assertIsUpdateChunk(value: unknown): asserts value is UpdateChunk { + if ( + typeof value !== 'object' || + value === null || + !('bundleTimestamp' in value) || + !('updateChunk' in value) || + (typeof value.bundleTimestamp !== 'string' && typeof value.bundleTimestamp !== 'number') || + typeof value.updateChunk !== 'string' + ) { + throw new Error('Invalid incremental render chunk received, missing properties'); + } +} + +export type IncrementalRenderInitialRequest = { + renderingRequest: string; + bundleTimestamp: string | number; + dependencyBundleTimestamps?: string[] | number[]; +}; + +export type IncrementalRenderResult = { + response: ResponseResult; + sink?: IncrementalRenderSink; +}; + +/** + * Starts handling an incremental render request. This function: + * - Calls handleRenderRequest internally to handle all validation and VM execution + * - Returns the result from handleRenderRequest directly + * - Provides a sink for future incremental updates (to be implemented in next commit) + */ +export async function handleIncrementalRenderRequest( + initial: IncrementalRenderInitialRequest, +): Promise { + const { renderingRequest, bundleTimestamp, dependencyBundleTimestamps } = initial; + + try { + // Call handleRenderRequest internally to handle all validation and VM execution + const { response, executionContext } = await handleRenderRequest({ + renderingRequest, + bundleTimestamp, + dependencyBundleTimestamps, + providedNewBundles: undefined, + assetsToCopy: undefined, + }); + + // If we don't get an execution context, it means there was an early error + // (e.g. bundle not found). In this case, the sink will be a no-op. + if (!executionContext) { + return { response }; + } + + // Return the result with a sink that uses the execution context + return { + response, + sink: { + add: (chunk: unknown) => { + try { + assertIsUpdateChunk(chunk); + const bundlePath = getRequestBundleFilePath(chunk.bundleTimestamp); + executionContext.runInVM(chunk.updateChunk, bundlePath).catch((err: unknown) => { + log.error({ msg: 'Error running incremental render chunk', err, chunk }); + }); + } catch (err) { + log.error({ msg: 'Invalid incremental render chunk', err, chunk }); + } + }, + }, + }; + } catch (error) { + // Handle any unexpected errors + const errorMessage = error instanceof Error ? error.message : String(error); + + return { + response: { + status: 500, + headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, + data: errorMessage, + }, + }; + } +} + +export type { ResponseResult }; diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderStream.ts b/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderStream.ts new file mode 100644 index 0000000000..7882210118 --- /dev/null +++ b/react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderStream.ts @@ -0,0 +1,114 @@ +import { StringDecoder } from 'string_decoder'; +import type { ResponseResult } from '../shared/utils'; +import * as errorReporter from '../shared/errorReporter'; + +/** + * Result interface for render request callbacks + */ +export interface RenderRequestResult { + response: ResponseResult; + shouldContinue: boolean; +} + +/** + * Options interface for incremental render stream handler + */ +export interface IncrementalRenderStreamHandlerOptions { + request: { + raw: NodeJS.ReadableStream | { [Symbol.asyncIterator](): AsyncIterator }; + }; + onRenderRequestReceived: (renderRequest: unknown) => Promise | RenderRequestResult; + onResponseStart: (response: ResponseResult) => Promise | void; + onUpdateReceived: (updateData: unknown) => Promise | void; + onRequestEnded: () => Promise | void; +} + +/** + * Handles incremental rendering requests with streaming JSON data. + * The first object triggers rendering, subsequent objects provide incremental updates. + */ +export async function handleIncrementalRenderStream( + options: IncrementalRenderStreamHandlerOptions, +): Promise { + const { request, onRenderRequestReceived, onResponseStart, onUpdateReceived, onRequestEnded } = options; + + let hasReceivedFirstObject = false; + const decoder = new StringDecoder('utf8'); + let buffer = ''; + + try { + for await (const chunk of request.raw) { + const str = decoder.write(chunk); + buffer += str; + + // Process all complete JSON objects in the buffer + let boundary = buffer.indexOf('\n'); + while (boundary !== -1) { + const rawObject = buffer.slice(0, boundary).trim(); + buffer = buffer.slice(boundary + 1); + boundary = buffer.indexOf('\n'); + + if (rawObject) { + let parsed: unknown; + try { + parsed = JSON.parse(rawObject); + } catch (err) { + const errorMessage = `Invalid JSON chunk: ${err instanceof Error ? err.message : String(err)}`; + + if (!hasReceivedFirstObject) { + // Error in first chunk - throw error to stop processing + throw new Error(errorMessage); + } else { + // Error in subsequent chunks - log and report but continue processing + const reportedMessage = `JSON parsing error in update chunk: ${err instanceof Error ? err.message : String(err)}`; + console.error(reportedMessage); + errorReporter.message(reportedMessage); + // Skip this malformed chunk and continue with next ones + // eslint-disable-next-line no-continue + continue; + } + } + + if (!hasReceivedFirstObject) { + hasReceivedFirstObject = true; + try { + // eslint-disable-next-line no-await-in-loop + const result = await onRenderRequestReceived(parsed); + const { response, shouldContinue: continueFlag } = result; + + void onResponseStart(response); + + if (!continueFlag) { + return; + } + } catch (err) { + // Error in first chunk processing - throw error to stop processing + const error = err instanceof Error ? err : new Error(String(err)); + error.message = `Error processing initial render request: ${error.message}`; + throw error; + } + } else { + try { + // eslint-disable-next-line no-await-in-loop + await onUpdateReceived(parsed); + } catch (err) { + // Error in update chunk processing - log and report but continue processing + const errorMessage = `Error processing update chunk: ${err instanceof Error ? err.message : String(err)}`; + console.error(errorMessage); + errorReporter.message(errorMessage); + // Continue processing other chunks + } + } + } + } + } + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + // Update the error message in place to retain the original stack trace, rather than creating a new error object + error.message = `Error while handling the request stream: ${error.message}`; + throw error; + } + + // Stream ended normally + void onRequestEnded(); +} diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/handleRenderRequest.ts b/react_on_rails_pro/packages/node-renderer/src/worker/handleRenderRequest.ts index 3d05d5a408..64a50cb976 100644 --- a/react_on_rails_pro/packages/node-renderer/src/worker/handleRenderRequest.ts +++ b/react_on_rails_pro/packages/node-renderer/src/worker/handleRenderRequest.ts @@ -23,10 +23,11 @@ import { isErrorRenderResult, getRequestBundleFilePath, deleteUploadedAssets, + validateBundlesExist, } from '../shared/utils'; import { getConfig } from '../shared/configBuilder'; import * as errorReporter from '../shared/errorReporter'; -import { buildVM, hasVMContextForBundle, runInVM } from './vm'; +import { buildExecutionContext, ExecutionContext, VMContextNotFoundError } from './vm'; export type ProvidedNewBundle = { timestamp: string | number; @@ -36,9 +37,10 @@ export type ProvidedNewBundle = { async function prepareResult( renderingRequest: string, bundleFilePathPerTimestamp: string, + executionContext: ExecutionContext, ): Promise { try { - const result = await runInVM(renderingRequest, bundleFilePathPerTimestamp, cluster); + const result = await executionContext.runInVM(renderingRequest, bundleFilePathPerTimestamp, cluster); let exceptionMessage = null; if (!result) { @@ -153,7 +155,7 @@ to ${bundleFilePathPerTimestamp})`, } } -async function handleNewBundlesProvided( +export async function handleNewBundlesProvided( renderingRequest: string, providedNewBundles: ProvidedNewBundle[], assetsToCopy: Asset[] | null | undefined, @@ -190,7 +192,7 @@ export async function handleRenderRequest({ dependencyBundleTimestamps?: string[] | number[]; providedNewBundles?: ProvidedNewBundle[] | null; assetsToCopy?: Asset[] | null; -}): Promise { +}): Promise<{ response: ResponseResult; executionContext?: ExecutionContext }> { try { // const bundleFilePathPerTimestamp = getRequestBundleFilePath(bundleTimestamp); const allBundleFilePaths = Array.from( @@ -202,52 +204,54 @@ export async function handleRenderRequest({ if (allBundleFilePaths.length > maxVMPoolSize) { return { - headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, - status: 410, - data: `Too many bundles uploaded. The maximum allowed is ${maxVMPoolSize}. Please reduce the number of bundles or increase maxVMPoolSize in your configuration.`, + response: { + headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, + status: 410, + data: `Too many bundles uploaded. The maximum allowed is ${maxVMPoolSize}. Please reduce the number of bundles or increase maxVMPoolSize in your configuration.`, + }, }; } - // If the current VM has the correct bundle and is ready - if (allBundleFilePaths.every((bundleFilePath) => hasVMContextForBundle(bundleFilePath))) { - return await prepareResult(renderingRequest, entryBundleFilePath); + try { + const executionContext = await buildExecutionContext(allBundleFilePaths, /* buildVmsIfNeeded */ false); + return { + response: await prepareResult(renderingRequest, entryBundleFilePath, executionContext), + executionContext, + }; + } catch (e) { + // Ignore VMContextNotFoundError, it means the bundle does not exist. + // The following code will handle this case. + if (!(e instanceof VMContextNotFoundError)) { + throw e; + } } // If gem has posted updated bundle: if (providedNewBundles && providedNewBundles.length > 0) { const result = await handleNewBundlesProvided(renderingRequest, providedNewBundles, assetsToCopy); if (result) { - return result; + return { response: result }; } } // Check if the bundle exists: - const missingBundles = ( - await Promise.all( - [...(dependencyBundleTimestamps ?? []), bundleTimestamp].map(async (timestamp) => { - const bundleFilePath = getRequestBundleFilePath(timestamp); - const fileExists = await fileExistsAsync(bundleFilePath); - return fileExists ? null : timestamp; - }), - ) - ).filter((timestamp) => timestamp !== null); - - if (missingBundles.length > 0) { - const missingBundlesText = missingBundles.length > 1 ? 'bundles' : 'bundle'; - log.info(`No saved ${missingBundlesText}: ${missingBundles.join(', ')}`); - return { - headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, - status: 410, - data: 'No bundle uploaded', - }; + const missingBundleError = await validateBundlesExist(bundleTimestamp, dependencyBundleTimestamps); + if (missingBundleError) { + return { response: missingBundleError }; } // The bundle exists, but the VM has not yet been created. // Another worker must have written it or it was saved during deployment. - log.info('Bundle %s exists. Building VM for worker %s.', entryBundleFilePath, workerIdLabel()); - await Promise.all(allBundleFilePaths.map((bundleFilePath) => buildVM(bundleFilePath))); - - return await prepareResult(renderingRequest, entryBundleFilePath); + log.info( + 'Bundle %s exists. Building ExecutionContext for worker %s.', + entryBundleFilePath, + workerIdLabel(), + ); + const executionContext = await buildExecutionContext(allBundleFilePaths, /* buildVmsIfNeeded */ true); + return { + response: await prepareResult(renderingRequest, entryBundleFilePath, executionContext), + executionContext, + }; } catch (error) { const msg = formatExceptionMessage( renderingRequest, diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/requestPrechecks.ts b/react_on_rails_pro/packages/node-renderer/src/worker/requestPrechecks.ts new file mode 100644 index 0000000000..737df00fc8 --- /dev/null +++ b/react_on_rails_pro/packages/node-renderer/src/worker/requestPrechecks.ts @@ -0,0 +1,27 @@ +/** + * Request prechecks logic that is independent of the HTTP server framework. + * @module worker/requestPrechecks + */ +import type { ResponseResult } from '../shared/utils'; +import { checkProtocolVersion, type ProtocolVersionBody } from './checkProtocolVersionHandler'; +import { authenticate, type AuthBody } from './authHandler'; + +export interface RequestPrechecksBody extends ProtocolVersionBody, AuthBody { + [key: string]: unknown; +} + +export function performRequestPrechecks(body: RequestPrechecksBody): ResponseResult | undefined { + // Check protocol version + const protocolVersionCheckingResult = checkProtocolVersion(body); + if (typeof protocolVersionCheckingResult === 'object') { + return protocolVersionCheckingResult; + } + + // Authenticate Ruby client + const authResult = authenticate(body); + if (typeof authResult === 'object') { + return authResult; + } + + return undefined; +} diff --git a/react_on_rails_pro/packages/node-renderer/src/worker/vm.ts b/react_on_rails_pro/packages/node-renderer/src/worker/vm.ts index 2f751512a4..cc462673c6 100644 --- a/react_on_rails_pro/packages/node-renderer/src/worker/vm.ts +++ b/react_on_rails_pro/packages/node-renderer/src/worker/vm.ts @@ -29,7 +29,7 @@ import * as errorReporter from '../shared/errorReporter'; const readFileAsync = promisify(fs.readFile); const writeFileAsync = promisify(fs.writeFile); -interface VMContext { +export interface VMContext { context: Context; sharedConsoleHistory: SharedConsoleHistory; lastUsed: number; // Track when this VM was last used @@ -39,7 +39,7 @@ interface VMContext { const vmContexts = new Map(); // Track VM creation promises to handle concurrent buildVM requests -const vmCreationPromises = new Map>(); +const vmCreationPromises = new Map>(); /** * Returns all bundle paths that have a VM context @@ -101,87 +101,17 @@ function manageVMPoolSize() { } } -/** - * - * @param renderingRequest JS Code to execute for SSR - * @param filePath - * @param vmCluster - */ -export async function runInVM( - renderingRequest: string, - filePath: string, - vmCluster?: typeof cluster, -): Promise { - const { bundlePath } = getConfig(); - - try { - // Wait for VM creation if it's in progress - if (vmCreationPromises.has(filePath)) { - await vmCreationPromises.get(filePath); - } - - // Get the correct VM context based on the provided bundle path - const vmContext = getVMContext(filePath); - - if (!vmContext) { - throw new Error(`No VM context found for bundle ${filePath}`); - } - - // Update last used timestamp - vmContext.lastUsed = Date.now(); - - const { context, sharedConsoleHistory } = vmContext; - - if (log.level === 'debug') { - // worker is nullable in the primary process - const workerId = vmCluster?.worker?.id; - log.debug(`worker ${workerId ? `${workerId} ` : ''}received render request for bundle ${filePath} with code -${smartTrim(renderingRequest)}`); - const debugOutputPathCode = path.join(bundlePath, 'code.js'); - log.debug(`Full code executed written to: ${debugOutputPathCode}`); - await writeFileAsync(debugOutputPathCode, renderingRequest); - } - - let result = sharedConsoleHistory.trackConsoleHistoryInRenderRequest(() => { - context.renderingRequest = renderingRequest; - try { - return vm.runInContext(renderingRequest, context) as RenderCodeResult; - } finally { - context.renderingRequest = undefined; - } - }); - - if (isReadableStream(result)) { - const newStreamAfterHandlingError = handleStreamError(result, (error) => { - const msg = formatExceptionMessage(renderingRequest, error, 'Error in a rendering stream'); - errorReporter.message(msg); - }); - return newStreamAfterHandlingError; - } - if (typeof result !== 'string') { - const objectResult = await result; - result = JSON.stringify(objectResult); - } - if (log.level === 'debug') { - log.debug(`result from JS: -${smartTrim(result)}`); - const debugOutputPathResult = path.join(bundlePath, 'result.json'); - log.debug(`Wrote result to file: ${debugOutputPathResult}`); - await writeFileAsync(debugOutputPathResult, result); - } - - return result; - } catch (exception) { - const exceptionMessage = formatExceptionMessage(renderingRequest, exception); - log.debug('Caught exception in rendering request', exceptionMessage); - return Promise.resolve({ exceptionMessage }); +export class VMContextNotFoundError extends Error { + constructor(bundleFilePath: string) { + super(`VMContext not found for bundle: ${bundleFilePath}`); + this.name = 'VMContextNotFoundError'; } } -export async function buildVM(filePath: string) { +async function buildVM(filePath: string): Promise { // Return existing promise if VM is already being created if (vmCreationPromises.has(filePath)) { - return vmCreationPromises.get(filePath); + return vmCreationPromises.get(filePath) as Promise; } // Check if VM for this bundle already exists @@ -189,7 +119,7 @@ export async function buildVM(filePath: string) { if (vmContext) { // Update last used time when accessing existing VM vmContext.lastUsed = Date.now(); - return Promise.resolve(true); + return Promise.resolve(vmContext); } // Create a new promise for this VM creation @@ -200,12 +130,7 @@ export async function buildVM(filePath: string) { additionalContext !== null && additionalContext.constructor === Object; const sharedConsoleHistory = new SharedConsoleHistory(); - const runOnOtherBundle = async (bundleTimestamp: string | number, renderingRequest: string) => { - const bundlePath = getRequestBundleFilePath(bundleTimestamp); - return runInVM(renderingRequest, bundlePath, cluster); - }; - - const contextObject = { sharedConsoleHistory, runOnOtherBundle }; + const contextObject = { sharedConsoleHistory }; if (supportModules) { // IMPORTANT: When adding anything to this object, update: @@ -306,11 +231,12 @@ export async function buildVM(filePath: string) { } // Only now, after VM is fully initialized, store the context - vmContexts.set(filePath, { + const newVmContext: VMContext = { context, sharedConsoleHistory, lastUsed: Date.now(), - }); + }; + vmContexts.set(filePath, newVmContext); // Manage pool size after adding new VM manageVMPoolSize(); @@ -331,7 +257,7 @@ export async function buildVM(filePath: string) { ); } - return true; + return newVmContext; } catch (error) { log.error('Caught Error when creating context in buildVM, %O', error); errorReporter.error(error as Error); @@ -348,6 +274,120 @@ export async function buildVM(filePath: string) { return vmCreationPromise; } +async function getOrBuildVMContext(bundleFilePath: string, buildVmsIfNeeded: boolean): Promise { + const vmContext = getVMContext(bundleFilePath); + if (vmContext) { + return vmContext; + } + + const vmCreationPromise = vmCreationPromises.get(bundleFilePath); + if (vmCreationPromise) { + return vmCreationPromise; + } + + if (buildVmsIfNeeded) { + return buildVM(bundleFilePath); + } + + throw new VMContextNotFoundError(bundleFilePath); +} + +export type ExecutionContext = { + runInVM: ( + renderingRequest: string, + bundleFilePath: string, + vmCluster?: typeof cluster, + ) => Promise; + getVMContext: (bundleFilePath: string) => VMContext | undefined; +}; + +export async function buildExecutionContext( + bundlePaths: string[], + buildVmsIfNeeded: boolean, +): Promise { + const mapBundleFilePathToVMContext = new Map(); + await Promise.all( + bundlePaths.map(async (bundleFilePath) => { + const vmContext = await getOrBuildVMContext(bundleFilePath, buildVmsIfNeeded); + vmContext.lastUsed = Date.now(); + mapBundleFilePathToVMContext.set(bundleFilePath, vmContext); + }), + ); + const sharedExecutionContext = new Map(); + + const runInVM = async (renderingRequest: string, bundleFilePath: string, vmCluster?: typeof cluster) => { + try { + const { bundlePath } = getConfig(); + const vmContext = mapBundleFilePathToVMContext.get(bundleFilePath); + if (!vmContext) { + throw new VMContextNotFoundError(bundleFilePath); + } + + // Update last used timestamp + vmContext.lastUsed = Date.now(); + + const { context, sharedConsoleHistory } = vmContext; + + if (log.level === 'debug') { + // worker is nullable in the primary process + const workerId = vmCluster?.worker?.id; + log.debug(`worker ${workerId ? `${workerId} ` : ''}received render request for bundle ${bundleFilePath} with code + ${smartTrim(renderingRequest)}`); + const debugOutputPathCode = path.join(bundlePath, 'code.js'); + log.debug(`Full code executed written to: ${debugOutputPathCode}`); + await writeFileAsync(debugOutputPathCode, renderingRequest); + } + + let result = sharedConsoleHistory.trackConsoleHistoryInRenderRequest(() => { + context.renderingRequest = renderingRequest; + context.sharedExecutionContext = sharedExecutionContext; + context.runOnOtherBundle = (bundleTimestamp: string | number, newRenderingRequest: string) => { + const otherBundleFilePath = getRequestBundleFilePath(bundleTimestamp); + return runInVM(newRenderingRequest, otherBundleFilePath, vmCluster); + }; + + try { + return vm.runInContext(renderingRequest, context) as RenderCodeResult; + } finally { + context.renderingRequest = undefined; + context.sharedExecutionContext = undefined; + context.runOnOtherBundle = undefined; + } + }); + + if (isReadableStream(result)) { + const newStreamAfterHandlingError = handleStreamError(result, (error) => { + const msg = formatExceptionMessage(renderingRequest, error, 'Error in a rendering stream'); + errorReporter.message(msg); + }); + return newStreamAfterHandlingError; + } + if (typeof result !== 'string') { + const objectResult = await result; + result = JSON.stringify(objectResult); + } + if (log.level === 'debug') { + log.debug(`result from JS: + ${smartTrim(result)}`); + const debugOutputPathResult = path.join(bundlePath, 'result.json'); + log.debug(`Wrote result to file: ${debugOutputPathResult}`); + await writeFileAsync(debugOutputPathResult, result); + } + + return result; + } catch (exception) { + const exceptionMessage = formatExceptionMessage(renderingRequest, exception); + log.debug('Caught exception in rendering request', exceptionMessage); + return Promise.resolve({ exceptionMessage }); + } + }; + + return { + getVMContext: (bundleFilePath: string) => mapBundleFilePathToVMContext.get(bundleFilePath), + runInVM, + }; +} + export function resetVM() { // Clear all VM contexts vmContexts.clear(); diff --git a/react_on_rails_pro/packages/node-renderer/tests/fixtures/bundle.js b/react_on_rails_pro/packages/node-renderer/tests/fixtures/bundle.js index 4ed2eac53f..b75ede3f5c 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/fixtures/bundle.js +++ b/react_on_rails_pro/packages/node-renderer/tests/fixtures/bundle.js @@ -1,3 +1,57 @@ +const { PassThrough } = require('stream'); + global.ReactOnRails = { dummy: { html: 'Dummy Object' }, + + // Get or create async value promise + getAsyncValue: function() { + debugger; + if (!sharedExecutionContext.has('asyncPromise')) { + const promiseData = {}; + const promise = new Promise((resolve, reject) => { + promiseData.resolve = resolve; + promiseData.reject = reject; + }); + promiseData.promise = promise; + sharedExecutionContext.set('asyncPromise', promiseData); + } + return sharedExecutionContext.get('asyncPromise').promise; + }, + + // Resolve the async value promise + setAsyncValue: function(value) { + debugger; + if (!sharedExecutionContext.has('asyncPromise')) { + ReactOnRails.getAsyncValue(); + } + const promiseData = sharedExecutionContext.get('asyncPromise'); + promiseData.resolve(value); + }, + + // Get or create stream + getStreamValues: function() { + if (!sharedExecutionContext.has('stream')) { + const stream = new PassThrough(); + sharedExecutionContext.set('stream', { stream }); + } + return sharedExecutionContext.get('stream').stream; + }, + + // Add value to stream + addStreamValue: function(value) { + if (!sharedExecutionContext.has('stream')) { + // Create the stream first if it doesn't exist + ReactOnRails.getStreamValues(); + } + const { stream } = sharedExecutionContext.get('stream'); + stream.write(value); + return value; + }, + + endStream: function() { + if (sharedExecutionContext.has('stream')) { + const { stream } = sharedExecutionContext.get('stream'); + stream.end(); + } + }, }; diff --git a/react_on_rails_pro/packages/node-renderer/tests/fixtures/projects/spec-dummy/asyncComponentsTreeForTestingRenderingRequest.js b/react_on_rails_pro/packages/node-renderer/tests/fixtures/projects/spec-dummy/asyncComponentsTreeForTestingRenderingRequest.js index 02d4de5dd7..8b48f9bb3f 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/fixtures/projects/spec-dummy/asyncComponentsTreeForTestingRenderingRequest.js +++ b/react_on_rails_pro/packages/node-renderer/tests/fixtures/projects/spec-dummy/asyncComponentsTreeForTestingRenderingRequest.js @@ -8,6 +8,7 @@ rscBundleHash: '88888-test', } + const runOnOtherBundle = globalThis.runOnOtherBundle; if (typeof generateRSCPayload !== 'function') { globalThis.generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) { const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters; diff --git a/react_on_rails_pro/packages/node-renderer/tests/fixtures/secondary-bundle.js b/react_on_rails_pro/packages/node-renderer/tests/fixtures/secondary-bundle.js index d901dd0526..cde44a80f7 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/fixtures/secondary-bundle.js +++ b/react_on_rails_pro/packages/node-renderer/tests/fixtures/secondary-bundle.js @@ -1,3 +1,53 @@ global.ReactOnRails = { dummy: { html: 'Dummy Object from secondary bundle' }, + + + // Get or create async value promise + getAsyncValue: function() { + if (!sharedExecutionContext.has('secondaryAsyncPromise')) { + const promiseData = {}; + const promise = new Promise((resolve, reject) => { + promiseData.resolve = resolve; + promiseData.reject = reject; + }); + promiseData.promise = promise; + sharedExecutionContext.set('secondaryAsyncPromise', promiseData); + } + return sharedExecutionContext.get('secondaryAsyncPromise').promise; + }, + + // Resolve the async value promise + setAsyncValue: function(value) { + if (!sharedExecutionContext.has('secondaryAsyncPromise')) { + ReactOnRails.getAsyncValue(); + } + const promiseData = sharedExecutionContext.get('secondaryAsyncPromise'); + promiseData.resolve(value); + }, + + // Get or create stream + getStreamValues: function() { + if (!sharedExecutionContext.has('secondaryStream')) { + const stream = new PassThrough(); + sharedExecutionContext.set('secondaryStream', { stream }); + } + return sharedExecutionContext.get('secondaryStream').stream; + }, + + // Add value to stream + addStreamValue: function(value) { + if (!sharedExecutionContext.has('secondaryStream')) { + // Create the stream first if it doesn't exist + ReactOnRails.getStreamValues(); + } + const { stream } = sharedExecutionContext.get('secondaryStream'); + stream.write(value); + }, + + endStream: function() { + if (sharedExecutionContext.has('secondaryStream')) { + const { stream } = sharedExecutionContext.get('secondaryStream'); + stream.end(); + } + }, }; diff --git a/react_on_rails_pro/packages/node-renderer/tests/helper.ts b/react_on_rails_pro/packages/node-renderer/tests/helper.ts index 0f078ba7e5..819ca62793 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/helper.ts +++ b/react_on_rails_pro/packages/node-renderer/tests/helper.ts @@ -4,7 +4,7 @@ import path from 'path'; import fsPromises from 'fs/promises'; import fs from 'fs'; import fsExtra from 'fs-extra'; -import { buildVM, resetVM } from '../src/worker/vm'; +import { buildExecutionContext, resetVM } from '../src/worker/vm'; import { buildConfig } from '../src/shared/configBuilder'; export const mkdirAsync = fsPromises.mkdir; @@ -58,13 +58,25 @@ export function vmSecondaryBundlePath(testName: string) { } export async function createVmBundle(testName: string) { + // Build config with module support before creating VM bundle + buildConfig({ + bundlePath: bundlePath(testName), + supportModules: true, + stubTimers: false, + }); await safeCopyFileAsync(getFixtureBundle(), vmBundlePath(testName)); - return buildVM(vmBundlePath(testName)); + await buildExecutionContext([vmBundlePath(testName)], /* buildVmsIfNeeded */ true); } export async function createSecondaryVmBundle(testName: string) { + // Build config with module support before creating VM bundle + buildConfig({ + bundlePath: bundlePath(testName), + supportModules: true, + stubTimers: false, + }); await safeCopyFileAsync(getFixtureSecondaryBundle(), vmSecondaryBundlePath(testName)); - return buildVM(vmSecondaryBundlePath(testName)); + await buildExecutionContext([vmSecondaryBundlePath(testName)], /* buildVmsIfNeeded */ true); } export function lockfilePath(testName: string) { @@ -144,4 +156,50 @@ export function readRenderingRequest(projectName: string, commit: string, reques return fs.readFileSync(path.resolve(__dirname, renderingRequestRelativePath), 'utf8'); } +/** + * Custom waitFor function that retries an expect statement until it passes or timeout is reached + * @param expectFn - Function containing Jest expect statements + * @param options - Configuration options + * @param options.timeout - Maximum time to wait in milliseconds (default: 1000) + * @param options.interval - Time between retries in milliseconds (default: 10) + * @param options.message - Custom error message when timeout is reached + */ +export const waitFor = async ( + expectFn: () => void, + options: { + timeout?: number; + interval?: number; + message?: string; + } = {}, +): Promise => { + const { timeout = 1000, interval = 10, message } = options; + const startTime = Date.now(); + let lastError: Error | null = null; + + while (Date.now() - startTime < timeout) { + try { + expectFn(); + // If we get here, the expect passed, so we can return + return; + } catch (error) { + lastError = error as Error; + // Expect failed, continue retrying + if (Date.now() - startTime >= timeout) { + // Timeout reached, re-throw the last error + throw error; + } + } + + // Wait before next retry + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, interval); + }); + } + + // Timeout reached, throw error with descriptive message + const defaultMessage = `Expect condition not met within ${timeout}ms`; + throw new Error(message || defaultMessage + (lastError ? `\nLast error: ${lastError.message}` : '')); +}; + setConfig('helper'); diff --git a/react_on_rails_pro/packages/node-renderer/tests/incrementalRender.test.ts b/react_on_rails_pro/packages/node-renderer/tests/incrementalRender.test.ts new file mode 100644 index 0000000000..325cb9f93c --- /dev/null +++ b/react_on_rails_pro/packages/node-renderer/tests/incrementalRender.test.ts @@ -0,0 +1,982 @@ +import http from 'http'; +import fs from 'fs'; +import path from 'path'; +import worker, { disableHttp2 } from '../src/worker'; +import packageJson from '../src/shared/packageJson'; +import * as incremental from '../src/worker/handleIncrementalRenderRequest'; +import { + createVmBundle, + createSecondaryVmBundle, + BUNDLE_TIMESTAMP, + SECONDARY_BUNDLE_TIMESTAMP, + waitFor, +} from './helper'; +import type { ResponseResult } from '../src/shared/utils'; + +// Disable HTTP/2 for testing like other tests do +disableHttp2(); + +describe('incremental render NDJSON endpoint', () => { + const TEST_NAME = 'incrementalRender'; + const BUNDLE_PATH = path.join(__dirname, 'tmp', TEST_NAME); + if (!fs.existsSync(BUNDLE_PATH)) { + fs.mkdirSync(BUNDLE_PATH, { recursive: true }); + } + + const app = worker({ + bundlePath: BUNDLE_PATH, + password: 'myPassword1', + // Keep HTTP logs quiet for tests + logHttpLevel: 'silent' as const, + supportModules: true, + }); + + // Helper functions to DRY up the tests + const getServerAddress = () => { + const addr = app.server.address(); + return { + host: typeof addr === 'object' && addr ? addr.address : '127.0.0.1', + port: typeof addr === 'object' && addr ? addr.port : 0, + }; + }; + + const createHttpRequest = (bundleTimestamp: string, pathSuffix = 'abc123') => { + const { host, port } = getServerAddress(); + const req = http.request({ + hostname: host, + port, + path: `/bundles/${bundleTimestamp}/incremental-render/${pathSuffix}`, + method: 'POST', + headers: { + 'Content-Type': 'application/x-ndjson', + }, + }); + req.setNoDelay(true); + return req; + }; + + const createInitialObject = (bundleTimestamp: string, password = 'myPassword1') => ({ + gemVersion: packageJson.version, + protocolVersion: packageJson.protocolVersion, + password, + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [bundleTimestamp], + }); + + const createMockSink = () => { + const sinkAdd = jest.fn(); + + const sink: incremental.IncrementalRenderSink = { + add: sinkAdd, + }; + + return { sink, sinkAdd }; + }; + + const createMockResponse = (data = 'mock response'): ResponseResult => ({ + status: 200, + headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, + data, + }); + + const createMockResult = (sink: incremental.IncrementalRenderSink, response?: ResponseResult) => { + const mockResponse = response || createMockResponse(); + return { + response: mockResponse, + sink, + } as incremental.IncrementalRenderResult; + }; + + const setupResponseHandler = (req: http.ClientRequest, captureData = false) => { + return new Promise<{ statusCode: number; data?: string }>((resolve, reject) => { + req.on('response', (res) => { + if (captureData) { + let data = ''; + res.on('data', (chunk: string) => { + data += chunk; + }); + res.on('end', () => { + resolve({ statusCode: res.statusCode || 0, data }); + }); + } else { + res.on('data', () => { + // Consume response data to prevent hanging + }); + res.on('end', () => { + resolve({ statusCode: res.statusCode || 0 }); + }); + } + res.on('error', (e) => { + reject(e); + }); + }); + req.on('error', (e) => { + reject(e); + }); + }); + }; + + /** + * Helper function to create a basic test setup with mocked handleIncrementalRenderRequest + */ + const createBasicTestSetup = async () => { + await createVmBundle(TEST_NAME); + + const { sink, sinkAdd } = createMockSink(); + const mockResponse = createMockResponse(); + const mockResult = createMockResult(sink, mockResponse); + + const handleSpy = jest + .spyOn(incremental, 'handleIncrementalRenderRequest') + .mockImplementation(() => Promise.resolve(mockResult)); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + return { + sink, + sinkAdd, + mockResponse, + mockResult, + handleSpy, + SERVER_BUNDLE_TIMESTAMP, + }; + }; + + /** + * Helper function to create a streaming test setup + */ + const createStreamingTestSetup = async () => { + await createVmBundle(TEST_NAME); + + const { Readable } = await import('stream'); + const responseStream = new Readable({ + read() { + // This is a readable stream that we can push to + }, + }); + + const sinkAdd = jest.fn(); + + const sink: incremental.IncrementalRenderSink = { + add: sinkAdd, + }; + + const mockResponse: ResponseResult = { + status: 200, + headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' }, + stream: responseStream, + }; + + const mockResult: incremental.IncrementalRenderResult = { + response: mockResponse, + sink, + }; + + const handleSpy = jest + .spyOn(incremental, 'handleIncrementalRenderRequest') + .mockImplementation(() => Promise.resolve(mockResult)); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + return { + responseStream, + sinkAdd, + sink, + mockResponse, + mockResult, + handleSpy, + SERVER_BUNDLE_TIMESTAMP, + }; + }; + + /** + * Helper function to send chunks and wait for processing + */ + const sendChunksAndWaitForProcessing = async ( + req: http.ClientRequest, + chunks: unknown[], + waitForCondition: (chunk: unknown, index: number) => Promise, + ) => { + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + req.write(`${JSON.stringify(chunk)}\n`); + + // eslint-disable-next-line no-await-in-loop + await waitForCondition(chunk, i); + } + }; + + /** + * Helper function to create streaming response promise + */ + const createStreamingResponsePromise = (req: http.ClientRequest) => { + const receivedChunks: string[] = []; + + const promise = new Promise<{ statusCode: number; streamedData: string[] }>((resolve, reject) => { + req.on('response', (res) => { + res.on('data', (chunk: Buffer) => { + const chunkStr = chunk.toString(); + receivedChunks.push(chunkStr); + }); + res.on('end', () => { + resolve({ + statusCode: res.statusCode || 0, + streamedData: [...receivedChunks], // Return a copy + }); + }); + res.on('error', (e) => { + reject(e); + }); + }); + req.on('error', (e) => { + reject(e); + }); + }); + + return { promise, receivedChunks }; + }; + + afterEach(() => { + jest.restoreAllMocks(); + }); + + beforeAll(async () => { + await app.ready(); + await app.listen({ port: 0 }); + }); + + afterAll(async () => { + await app.close(); + }); + + test('calls handleIncrementalRenderRequest immediately after first chunk and processes each subsequent chunk immediately', async () => { + const { sinkAdd, handleSpy, SERVER_BUNDLE_TIMESTAMP } = await createBasicTestSetup(); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to handle the response + const responsePromise = setupResponseHandler(req); + + // Write first object (headers, auth, and initial renderingRequest) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + + // Wait for the server to process the first object + await waitFor(() => { + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + // Verify handleIncrementalRenderRequest was called immediately after first chunk + expect(handleSpy).toHaveBeenCalledTimes(1); + expect(sinkAdd).not.toHaveBeenCalled(); // No subsequent chunks processed yet + + // Send subsequent props chunks one by one and verify immediate processing + const chunksToSend = [{ a: 1 }, { b: 2 }, { c: 3 }]; + + await sendChunksAndWaitForProcessing(req, chunksToSend, async (chunk, index) => { + const expectedCallsBeforeWrite = index; + + // Verify state before writing this chunk + expect(sinkAdd).toHaveBeenCalledTimes(expectedCallsBeforeWrite); + + // Wait for the chunk to be processed + await waitFor(() => { + expect(sinkAdd).toHaveBeenCalledTimes(expectedCallsBeforeWrite + 1); + }); + + // Verify the chunk was processed immediately + expect(sinkAdd).toHaveBeenCalledTimes(expectedCallsBeforeWrite + 1); + expect(sinkAdd).toHaveBeenNthCalledWith(expectedCallsBeforeWrite + 1, chunk); + }); + + req.end(); + + // Wait for the request to complete + await responsePromise; + + // Final verification: all chunks were processed in the correct order + expect(handleSpy).toHaveBeenCalledTimes(1); + expect(sinkAdd.mock.calls).toEqual([[{ a: 1 }], [{ b: 2 }], [{ c: 3 }]]); + }); + + test('returns 410 error when bundle is missing', async () => { + const MISSING_BUNDLE_TIMESTAMP = 'non-existent-bundle-123'; + + // Create the HTTP request with a non-existent bundle + const req = createHttpRequest(MISSING_BUNDLE_TIMESTAMP); + + // Set up promise to capture the response + const responsePromise = setupResponseHandler(req, true); + + // Write first object with auth data + const initialObj = createInitialObject(MISSING_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify that we get a 410 error + expect(response.statusCode).toBe(410); + expect(response.data).toContain('No bundle uploaded'); + }); + + test('returns 400 error when first chunk contains malformed JSON', async () => { + // Create a bundle for this test + await createVmBundle(TEST_NAME); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to capture the response + const responsePromise = setupResponseHandler(req, true); + + // Write malformed JSON as first chunk (missing closing brace) + const malformedJson = `{"gemVersion": "1.0.0", "protocolVersion": "2.0.0", "password": "myPassword1", "renderingRequest": "ReactOnRails.dummy", "dependencyBundleTimestamps": ["${SERVER_BUNDLE_TIMESTAMP}"]\n`; + req.write(malformedJson); + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify that we get a 400 error due to malformed JSON + expect(response.statusCode).toBe(400); + expect(response.data).toContain('Invalid JSON chunk'); + }); + + test('continues processing when update chunk contains malformed JSON', async () => { + // Create a bundle for this test + await createVmBundle(TEST_NAME); + + const { sink, sinkAdd } = createMockSink(); + + const mockResponse: ResponseResult = createMockResponse(); + + const mockResult: incremental.IncrementalRenderResult = createMockResult(sink, mockResponse); + + const resultPromise = Promise.resolve(mockResult); + const handleSpy = jest + .spyOn(incremental, 'handleIncrementalRenderRequest') + .mockImplementation(() => resultPromise); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to handle the response + const responsePromise = setupResponseHandler(req); + + // Write first object (valid JSON) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + + // Wait for the server to process the first object + await waitFor(() => { + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + // Send a valid chunk first + const validChunk = { a: 1 }; + req.write(`${JSON.stringify(validChunk)}\n`); + + // Wait for processing + await waitFor(() => { + expect(sinkAdd).toHaveBeenCalledWith({ a: 1 }); + }); + + // Verify the valid chunk was processed + expect(sinkAdd).toHaveBeenCalledWith({ a: 1 }); + + // Send a malformed JSON chunk + const malformedChunk = '{"invalid": json}\n'; + req.write(malformedChunk); + + // Send another valid chunk + const secondValidChunk = { d: 4 }; + req.write(`${JSON.stringify(secondValidChunk)}\n`); + + req.end(); + + // Wait for the request to complete + await responsePromise; + + // Verify that processing continued after the malformed chunk + // The malformed chunk should be skipped, but valid chunks should be processed + // Verify that the stream completed successfully + await waitFor(() => { + expect(sinkAdd.mock.calls).toEqual([[{ a: 1 }], [{ d: 4 }]]); + }); + }); + + test('handles empty lines gracefully in the stream', async () => { + // Create a bundle for this test + await createVmBundle(TEST_NAME); + + const { sink, sinkAdd } = createMockSink(); + + const mockResponse: ResponseResult = createMockResponse(); + + const mockResult: incremental.IncrementalRenderResult = createMockResult(sink, mockResponse); + + const resultPromise = Promise.resolve(mockResult); + const handleSpy = jest + .spyOn(incremental, 'handleIncrementalRenderRequest') + .mockImplementation(() => resultPromise); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to handle the response + const responsePromise = setupResponseHandler(req); + + // Write first object (valid JSON) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + + // Wait for processing + await waitFor(() => { + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + // Send chunks with empty lines mixed in + const chunksToSend = [{ a: 1 }, { b: 2 }, { c: 3 }]; + + for (const chunk of chunksToSend) { + req.write(`${JSON.stringify(chunk)}\n`); + // eslint-disable-next-line no-await-in-loop + await waitFor(() => { + expect(sinkAdd).toHaveBeenCalledWith(chunk); + }); + } + + req.end(); + + // Wait for the request to complete + await responsePromise; + + // Verify that only valid JSON objects were processed + expect(handleSpy).toHaveBeenCalledTimes(1); + expect(sinkAdd.mock.calls).toEqual([[{ a: 1 }], [{ b: 2 }], [{ c: 3 }]]); + }); + + test('throws error when first chunk processing fails (e.g., authentication)', async () => { + // Create a bundle for this test + await createVmBundle(TEST_NAME); + + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to capture the response + const responsePromise = setupResponseHandler(req, true); + + // Write first object with invalid password (will cause authentication failure) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP, 'wrongPassword'); // Invalid password + req.write(`${JSON.stringify(initialObj)}\n`); + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify that we get an authentication error (should be 400 or 401) + expect(response.statusCode).toBeGreaterThanOrEqual(400); + expect(response.statusCode).toBeLessThan(500); + + // The response should contain an authentication error message + const responseText = response.data?.toLowerCase(); + expect( + responseText?.includes('password') || + responseText?.includes('auth') || + responseText?.includes('unauthorized'), + ).toBe(true); + }); + + test('streaming response - client receives all streamed chunks in real-time', async () => { + const responseChunks = [ + 'Hello from stream', + 'Chunk 1', + 'Chunk 2', + 'Chunk 3', + 'Chunk 4', + 'Chunk 5', + 'Goodbye from stream', + ]; + + const { responseStream, sinkAdd, handleSpy, SERVER_BUNDLE_TIMESTAMP } = await createStreamingTestSetup(); + + // write the response chunks to the stream + let sentChunkIndex = 0; + const intervalId = setInterval(() => { + if (sentChunkIndex < responseChunks.length) { + responseStream.push(responseChunks[sentChunkIndex] || null); + sentChunkIndex += 1; + } else { + responseStream.push(null); + clearInterval(intervalId); + } + }, 10); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to capture the streaming response + const { promise } = createStreamingResponsePromise(req); + + // Write first object (valid JSON) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + + // Wait for the server to process the first object and set up the response + await waitFor(() => { + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + // Verify handleIncrementalRenderRequest was called + expect(handleSpy).toHaveBeenCalledTimes(1); + + // Send a few chunks to trigger processing + const chunksToSend = [ + { type: 'update', data: 'chunk1' }, + { type: 'update', data: 'chunk2' }, + { type: 'update', data: 'chunk3' }, + ]; + + await sendChunksAndWaitForProcessing(req, chunksToSend, async (chunk) => { + await waitFor(() => { + expect(sinkAdd).toHaveBeenCalledWith(chunk); + }); + }); + + // End the request + req.end(); + + // Wait for the request to complete and capture the streaming response + const response = await promise; + + // Verify the response status + expect(response.statusCode).toBe(200); + + // Verify that we received all the streamed chunks + expect(response.streamedData).toHaveLength(responseChunks.length); + + // Verify that each chunk was received in order + responseChunks.forEach((expectedChunk, index) => { + const receivedChunk = response.streamedData[index]; + expect(receivedChunk).toEqual(expectedChunk); + }); + + // Verify that all request chunks were processed + expect(sinkAdd).toHaveBeenCalledTimes(chunksToSend.length); + chunksToSend.forEach((chunk, index) => { + expect(sinkAdd).toHaveBeenNthCalledWith(index + 1, chunk); + }); + + // Verify that the mock was called correctly + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + test('echo server - processes each chunk and immediately streams it back', async () => { + const { responseStream, sinkAdd, handleSpy, SERVER_BUNDLE_TIMESTAMP } = await createStreamingTestSetup(); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up promise to capture the streaming response + const { promise, receivedChunks } = createStreamingResponsePromise(req); + + // Write first object (valid JSON) + const initialObj = createInitialObject(SERVER_BUNDLE_TIMESTAMP); + req.write(`${JSON.stringify(initialObj)}\n`); + + // Wait for the server to process the first object and set up the response + await waitFor(() => { + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + // Verify handleIncrementalRenderRequest was called + expect(handleSpy).toHaveBeenCalledTimes(1); + + // Send chunks one by one and verify immediate processing and echoing + const chunksToSend = [ + { type: 'update', data: 'chunk1' }, + { type: 'update', data: 'chunk2' }, + { type: 'update', data: 'chunk3' }, + { type: 'update', data: 'chunk4' }, + ]; + + // Process each chunk and immediately echo it back + for (let i = 0; i < chunksToSend.length; i += 1) { + const chunk = chunksToSend[i]; + + // Send the chunk + req.write(`${JSON.stringify(chunk)}\n`); + + // Wait for the chunk to be processed + // eslint-disable-next-line no-await-in-loop + await waitFor(() => { + expect(sinkAdd).toHaveBeenCalledWith(chunk); + }); + + // Immediately echo the chunk back through the stream + const echoResponse = `processed ${JSON.stringify(chunk)}`; + responseStream.push(echoResponse); + + // Wait for the echo response to be received by the client + // eslint-disable-next-line no-await-in-loop + await waitFor(() => { + expect(receivedChunks[i]).toEqual(echoResponse); + }); + + // Wait a moment to ensure the echo is sent + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + } + + // End the stream to signal no more data + responseStream.push(null); + + // End the request + req.end(); + + // Wait for the request to complete and capture the streaming response + const response = await promise; + + // Verify the response status + expect(response.statusCode).toBe(200); + + // Verify that we received echo responses for each chunk + expect(response.streamedData).toHaveLength(chunksToSend.length); + + // Verify that each chunk was echoed back correctly + chunksToSend.forEach((chunk, index) => { + const expectedEcho = `processed ${JSON.stringify(chunk)}`; + const receivedEcho = response.streamedData[index]; + expect(receivedEcho).toEqual(expectedEcho); + }); + + // Verify that all request chunks were processed + expect(sinkAdd).toHaveBeenCalledTimes(chunksToSend.length); + chunksToSend.forEach((chunk, index) => { + expect(sinkAdd).toHaveBeenNthCalledWith(index + 1, chunk); + }); + + // Verify that the mock was called correctly + expect(handleSpy).toHaveBeenCalledTimes(1); + }); + + describe('incremental render update chunk functionality', () => { + test.only('basic incremental update - initial request gets value, update chunks set value', async () => { + await createVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling + const responsePromise = setupResponseHandler(req, true); + + // Send the initial object that gets the async value (should resolve after setAsyncValue is called) + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: 'ReactOnRails.getStreamValues()', + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send update chunks that set the async value + const updateChunk1 = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: 'ReactOnRails.addStreamValue("first update");ReactOnRails.endStream();', + }; + req.write(`${JSON.stringify(updateChunk1)}\n`); + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response + expect(response.statusCode).toBe(200); + expect(response.data).toBe('first update'); // Should resolve with the first setAsyncValue call + }); + + test('incremental updates work with multiple bundles using runOnOtherBundle', async () => { + await createVmBundle(TEST_NAME); + await createSecondaryVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + const SECONDARY_BUNDLE_TIMESTAMP_STR = String(SECONDARY_BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling + const responsePromise = setupResponseHandler(req, true); + + // Send the initial object that gets values from both bundles + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: ` + runOnOtherBundle(${SECONDARY_BUNDLE_TIMESTAMP}, 'ReactOnRails.getAsyncValue()').then((secondaryValue) => ({ + mainBundleValue: ReactOnRails.getAsyncValue(), + secondaryBundleValue: JSON.parse(secondaryValue), + })); + `, + dependencyBundleTimestamps: [SECONDARY_BUNDLE_TIMESTAMP_STR], + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send update chunks to both bundles + const updateMainBundle = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: 'ReactOnRails.setAsyncValue("main bundle updated")', + }; + req.write(`${JSON.stringify(updateMainBundle)}\n`); + + const updateSecondaryBundle = { + bundleTimestamp: SECONDARY_BUNDLE_TIMESTAMP_STR, + updateChunk: 'ReactOnRails.setAsyncValue("secondary bundle updated")', + }; + req.write(`${JSON.stringify(updateSecondaryBundle)}\n`); + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response + expect(response.statusCode).toBe(200); + const responseData = JSON.parse(response.data || '{}') as { + mainBundleValue: unknown; + secondaryBundleValue: unknown; + }; + expect(responseData.mainBundleValue).toBe('main bundle updated'); + expect(responseData.secondaryBundleValue).toBe('secondary bundle updated'); + }); + + test('streaming functionality with incremental updates', async () => { + await createVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling to capture streaming data + const streamedData: string[] = []; + const responsePromise = new Promise<{ statusCode: number }>((resolve, reject) => { + req.on('response', (res) => { + res.on('data', (chunk: string) => { + streamedData.push(chunk.toString()); + }); + res.on('end', () => { + resolve({ statusCode: res.statusCode || 0 }); + }); + res.on('error', reject); + }); + req.on('error', reject); + }); + + // Send the initial object that clears stream values and returns the stream + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: 'ReactOnRails.getStreamValues()', + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send update chunks that add stream values + const streamValues = ['stream1', 'stream2', 'stream3']; + for (const value of streamValues) { + const updateChunk = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: `ReactOnRails.addStreamValue("${value}")`, + }; + req.write(`${JSON.stringify(updateChunk)}\n`); + } + + // No need to get stream values again since we're already streaming + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response + expect(response.statusCode).toBe(200); + // Since we're returning a stream, the response should indicate streaming + expect(streamedData.length).toBeGreaterThan(0); + }); + + test('error handling in incremental render updates', async () => { + await createVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling + const responsePromise = setupResponseHandler(req, true); + + // Send the initial object + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: 'ReactOnRails.getAsyncValue()', + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send a malformed update chunk (missing bundleTimestamp) + const malformedChunk = { + updateChunk: 'ReactOnRails.setAsyncValue("should not work")', + }; + req.write(`${JSON.stringify(malformedChunk)}\n`); + + // Send a valid update chunk after the malformed one + const validChunk = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: 'ReactOnRails.setAsyncValue("valid update")', + }; + req.write(`${JSON.stringify(validChunk)}\n`); + + // Send a chunk with invalid JavaScript + const invalidJSChunk = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: 'this is not valid javascript syntax !!!', + }; + req.write(`${JSON.stringify(invalidJSChunk)}\n`); + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response - should still work despite errors + expect(response.statusCode).toBe(200); + expect(response.data).toBe('"valid update"'); // Should resolve with the valid update + }); + + test('update chunks with non-existent bundle timestamp', async () => { + await createVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + const NON_EXISTENT_TIMESTAMP = '9999999999999'; + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling + const responsePromise = setupResponseHandler(req, true); + + // Send the initial object + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: 'ReactOnRails.getAsyncValue()', + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send update chunk with non-existent bundle timestamp + const updateChunk = { + bundleTimestamp: NON_EXISTENT_TIMESTAMP, + updateChunk: 'ReactOnRails.setAsyncValue("should not work")', + }; + req.write(`${JSON.stringify(updateChunk)}\n`); + + // Send a valid update chunk + const validChunk = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: 'ReactOnRails.setAsyncValue("valid update")', + }; + req.write(`${JSON.stringify(validChunk)}\n`); + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response + expect(response.statusCode).toBe(200); + expect(response.data).toBe('"valid update"'); // Should resolve with the valid update + }); + + test('complex multi-bundle streaming scenario', async () => { + await createVmBundle(TEST_NAME); + await createSecondaryVmBundle(TEST_NAME); + const SERVER_BUNDLE_TIMESTAMP = String(BUNDLE_TIMESTAMP); + const SECONDARY_BUNDLE_TIMESTAMP_STR = String(SECONDARY_BUNDLE_TIMESTAMP); + + // Create the HTTP request + const req = createHttpRequest(SERVER_BUNDLE_TIMESTAMP); + + // Set up response handling + const responsePromise = setupResponseHandler(req, true); + + // Send the initial object that sets up both bundles for streaming + const initialObject = { + ...createInitialObject(SERVER_BUNDLE_TIMESTAMP), + renderingRequest: ` + ReactOnRails.clearStreamValues(); + runOnOtherBundle(${SECONDARY_BUNDLE_TIMESTAMP}, 'ReactOnRails.clearStreamValues()').then(() => ({ + mainCleared: true, + secondaryCleared: true, + })); + `, + dependencyBundleTimestamps: [SECONDARY_BUNDLE_TIMESTAMP_STR], + }; + req.write(`${JSON.stringify(initialObject)}\n`); + + // Send alternating updates to both bundles + const updates = [ + { bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, updateChunk: 'ReactOnRails.addStreamValue("main1")' }, + { + bundleTimestamp: SECONDARY_BUNDLE_TIMESTAMP_STR, + updateChunk: 'ReactOnRails.addStreamValue("secondary1")', + }, + { bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, updateChunk: 'ReactOnRails.addStreamValue("main2")' }, + { + bundleTimestamp: SECONDARY_BUNDLE_TIMESTAMP_STR, + updateChunk: 'ReactOnRails.addStreamValue("secondary2")', + }, + ]; + + for (const update of updates) { + req.write(`${JSON.stringify(update)}\n`); + } + + // Get final state from both bundles + const getFinalState = { + bundleTimestamp: SERVER_BUNDLE_TIMESTAMP, + updateChunk: ` + runOnOtherBundle(${SECONDARY_BUNDLE_TIMESTAMP}, 'ReactOnRails.getStreamValues()').then((secondaryValues) => ({ + mainValues: ReactOnRails.getStreamValues(), + secondaryValues: JSON.parse(secondaryValues), + })); + `, + }; + req.write(`${JSON.stringify(getFinalState)}\n`); + + // End the request + req.end(); + + // Wait for the response + const response = await responsePromise; + + // Verify the response + expect(response.statusCode).toBe(200); + const responseData = JSON.parse(response.data || '{}') as { + mainCleared: unknown; + secondaryCleared: unknown; + }; + expect(responseData.mainCleared).toBe(true); + expect(responseData.secondaryCleared).toBe(true); + }); + }); +}); diff --git a/react_on_rails_pro/packages/node-renderer/tests/serverRenderRSCReactComponent.test.js b/react_on_rails_pro/packages/node-renderer/tests/serverRenderRSCReactComponent.test.js index adf2114f23..b7893e25d2 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/serverRenderRSCReactComponent.test.js +++ b/react_on_rails_pro/packages/node-renderer/tests/serverRenderRSCReactComponent.test.js @@ -1,7 +1,7 @@ import path from 'path'; import fs from 'fs'; import { Readable } from 'stream'; -import { buildVM, getVMContext, resetVM } from '../src/worker/vm'; +import { buildExecutionContext, resetVM } from '../src/worker/vm'; import { getConfig } from '../src/shared/configBuilder'; const SimpleWorkingComponent = () => 'hello'; @@ -62,9 +62,8 @@ describe('serverRenderRSCReactComponent', () => { // The serverRenderRSCReactComponent function should only be called when the bundle is compiled with the `react-server` condition. // Therefore, we cannot call it directly in the test files. Instead, we run the RSC bundle through the VM and call the method from there. const getReactOnRailsRSCObject = async () => { - // Use the copied rsc-bundle.js file from temp directory - await buildVM(tempRscBundlePath); - const vmContext = getVMContext(tempRscBundlePath); + const executionContext = await buildExecutionContext([tempRscBundlePath], /* buildVmsIfNeeded */ true); + const vmContext = executionContext.getVMContext(tempRscBundlePath); const { ReactOnRails, React } = vmContext.context; function SuspensedComponentWithAsyncError() { diff --git a/react_on_rails_pro/packages/node-renderer/tests/vm.test.ts b/react_on_rails_pro/packages/node-renderer/tests/vm.test.ts index 051e5d4d92..e2559a233f 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/vm.test.ts +++ b/react_on_rails_pro/packages/node-renderer/tests/vm.test.ts @@ -7,7 +7,7 @@ import { resetForTest, BUNDLE_TIMESTAMP, } from './helper'; -import { buildVM, hasVMContextForBundle, resetVM, runInVM, getVMContext } from '../src/worker/vm'; +import { buildExecutionContext, hasVMContextForBundle, resetVM } from '../src/worker/vm'; import { getConfig } from '../src/shared/configBuilder'; import { isErrorRenderResult } from '../src/shared/utils'; @@ -31,7 +31,10 @@ describe('buildVM and runInVM', () => { config.supportModules = false; await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); let result = await runInVM('typeof Buffer === "undefined"', uploadedBundlePathForTest()); expect(result).toBeTruthy(); @@ -45,7 +48,10 @@ describe('buildVM and runInVM', () => { config.supportModules = true; await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); let result = await runInVM('typeof Buffer !== "undefined"', uploadedBundlePathForTest()); expect(result).toBeTruthy(); @@ -58,7 +64,10 @@ describe('buildVM and runInVM', () => { describe('additionalContext', () => { test('not available if additionalContext not set', async () => { await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); const result = await runInVM('typeof testString === "undefined"', uploadedBundlePathForTest()); expect(result).toBeTruthy(); @@ -69,7 +78,10 @@ describe('buildVM and runInVM', () => { config.additionalContext = { testString: 'a string' }; await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); const result = await runInVM('typeof testString !== "undefined"', uploadedBundlePathForTest()); expect(result).toBeTruthy(); @@ -80,7 +92,10 @@ describe('buildVM and runInVM', () => { expect.assertions(14); await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); let result = await runInVM('ReactOnRails', uploadedBundlePathForTest()); expect(result).toEqual(JSON.stringify({ dummy: { html: 'Dummy Object' } })); @@ -128,7 +143,10 @@ describe('buildVM and runInVM', () => { test('VM security and captured exceptions', async () => { expect.assertions(1); await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); // Adopted form https://github.com/patriksimek/vm2/blob/master/test/tests.js: const result = await runInVM('process.exit()', uploadedBundlePathForTest()); expect( @@ -139,7 +157,10 @@ describe('buildVM and runInVM', () => { test('Captured exceptions for a long message', async () => { expect.assertions(4); await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); // Adopted form https://github.com/patriksimek/vm2/blob/master/test/tests.js: const code = `process.exit()${'\n// 1234567890123456789012345678901234567890'.repeat( 50, @@ -155,7 +176,10 @@ describe('buildVM and runInVM', () => { test('resetVM', async () => { expect.assertions(2); await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); const result = await runInVM('ReactOnRails', uploadedBundlePathForTest()); expect(result).toEqual(JSON.stringify({ dummy: { html: 'Dummy Object' } })); @@ -168,7 +192,10 @@ describe('buildVM and runInVM', () => { test('VM console history', async () => { expect.assertions(1); await createUploadedBundleForTest(); - await buildVM(uploadedBundlePathForTest()); + const { runInVM } = await buildExecutionContext( + [uploadedBundlePathForTest()], + /* buildVmsIfNeeded */ true, + ); const vmResult = await runInVM( 'console.log("Console message inside of VM") || console.history;', @@ -205,7 +232,7 @@ describe('buildVM and runInVM', () => { __dirname, './fixtures/projects/friendsandguests/1a7fe417/server-bundle.js', ); - await buildVM(serverBundlePath); + const { runInVM } = await buildExecutionContext([serverBundlePath], /* buildVmsIfNeeded */ true); // WelcomePage component: const welcomePageComponentRenderingRequest = readRenderingRequest( @@ -279,7 +306,7 @@ describe('buildVM and runInVM', () => { __dirname, './fixtures/projects/react-webpack-rails-tutorial/ec974491/server-bundle.js', ); - await buildVM(serverBundlePath); + const { runInVM } = await buildExecutionContext([serverBundlePath], /* buildVmsIfNeeded */ true); // NavigationBar component: const navigationBarComponentRenderingRequest = readRenderingRequest( @@ -322,7 +349,7 @@ describe('buildVM and runInVM', () => { __dirname, './fixtures/projects/bionicworkshop/fa6ccf6b/server-bundle.js', ); - await buildVM(serverBundlePath); + const { runInVM } = await buildExecutionContext([serverBundlePath], /* buildVmsIfNeeded */ true); // SignIn page with flash component: const signInPageWithFlashRenderingRequest = readRenderingRequest( @@ -379,7 +406,7 @@ describe('buildVM and runInVM', () => { __dirname, './fixtures/projects/spec-dummy/9fa89f7/server-bundle-web-target.js', ); - await buildVM(serverBundlePath); + const { runInVM } = await buildExecutionContext([serverBundlePath], /* buildVmsIfNeeded */ true); // WelcomePage component: const reduxAppComponentRenderingRequest = readRenderingRequest( @@ -417,11 +444,11 @@ describe('buildVM and runInVM', () => { config.stubTimers = false; config.replayServerAsyncOperationLogs = replayServerAsyncOperationLogs; - await buildVM(serverBundlePath); + return buildExecutionContext([serverBundlePath], /* buildVmsIfNeeded */ true); }; test('console logs in sync and async server operations', async () => { - await prepareVM(true); + const { runInVM } = await prepareVM(true); const consoleLogsInAsyncServerRequestResult = (await runInVM( consoleLogsInAsyncServerRequest, serverBundlePath, @@ -442,7 +469,7 @@ describe('buildVM and runInVM', () => { }); test('console logs are not leaked to other requests', async () => { - await prepareVM(true); + const { runInVM } = await prepareVM(true); const otherRequestId = '9f3b7e12-5a8d-4c6f-b1e3-2d7f8a6c9e0b'; const otherconsoleLogsInAsyncServerRequest = consoleLogsInAsyncServerRequest.replace( requestId, @@ -474,7 +501,7 @@ describe('buildVM and runInVM', () => { }); test('if replayServerAsyncOperationLogs is false, only sync console logs are replayed', async () => { - await prepareVM(false); + const { runInVM } = await prepareVM(false); const consoleLogsInAsyncServerRequestResult = await runInVM( consoleLogsInAsyncServerRequest, serverBundlePath, @@ -495,7 +522,7 @@ describe('buildVM and runInVM', () => { }); test('console logs are not leaked to other requests when replayServerAsyncOperationLogs is false', async () => { - await prepareVM(false); + const { runInVM } = await prepareVM(false); const otherRequestId = '9f3b7e12-5a8d-4c6f-b1e3-2d7f8a6c9e0b'; const otherconsoleLogsInAsyncServerRequest = consoleLogsInAsyncServerRequest.replace( requestId, @@ -531,7 +558,7 @@ describe('buildVM and runInVM', () => { test('calling multiple buildVM in parallel creates the same VM context', async () => { const buildAndGetVmContext = async () => { - await prepareVM(true); + const { getVMContext } = await prepareVM(true); return getVMContext(serverBundlePath); }; @@ -541,7 +568,7 @@ describe('buildVM and runInVM', () => { test('running runInVM before buildVM', async () => { resetVM(); - void prepareVM(true); + const { runInVM } = await prepareVM(true); // If the bundle is parsed, ReactOnRails object will be globally available and has the serverRenderReactComponent method const ReactOnRails = await runInVM( 'typeof ReactOnRails !== "undefined" && ReactOnRails && typeof ReactOnRails.serverRenderReactComponent', @@ -552,17 +579,22 @@ describe('buildVM and runInVM', () => { test("running multiple buildVM in parallel doesn't cause runInVM to return partial results", async () => { resetVM(); - void Promise.all([prepareVM(true), prepareVM(true), prepareVM(true), prepareVM(true)]); + const [{ runInVM: runInVM1 }, { runInVM: runInVM2 }, { runInVM: runInVM3 }] = await Promise.all([ + prepareVM(true), + prepareVM(true), + prepareVM(true), + prepareVM(true), + ]); // If the bundle is parsed, ReactOnRails object will be globally available and has the serverRenderReactComponent method - const runCodeInVM = () => + const runCodeInVM = (runInVM: typeof runInVM1) => runInVM( 'typeof ReactOnRails !== "undefined" && ReactOnRails && typeof ReactOnRails.serverRenderReactComponent', serverBundlePath, ); const [runCodeInVM1, runCodeInVM2, runCodeInVM3] = await Promise.all([ - runCodeInVM(), - runCodeInVM(), - runCodeInVM(), + runCodeInVM(runInVM1), + runCodeInVM(runInVM2), + runCodeInVM(runInVM3), ]); expect(runCodeInVM1).toBe('function'); expect(runCodeInVM2).toBe('function'); @@ -595,9 +627,9 @@ describe('buildVM and runInVM', () => { const bundle3 = path.resolve(__dirname, './fixtures/projects/bionicworkshop/fa6ccf6b/server-bundle.js'); // Build VMs up to and beyond the pool limit - await buildVM(bundle1); - await buildVM(bundle2); - await buildVM(bundle3); + await buildExecutionContext([bundle1], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle3], /* buildVmsIfNeeded */ true); // Only the two most recently used bundles should have contexts expect(hasVMContextForBundle(bundle1)).toBeFalsy(); @@ -614,10 +646,10 @@ describe('buildVM and runInVM', () => { __dirname, './fixtures/projects/spec-dummy/e5e10d1/server-bundle-node-target.js', ); - await buildVM(bundle1); - await buildVM(bundle2); - await buildVM(bundle2); - await buildVM(bundle2); + await buildExecutionContext([bundle1], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); expect(hasVMContextForBundle(bundle1)).toBeTruthy(); expect(hasVMContextForBundle(bundle2)).toBeTruthy(); @@ -635,8 +667,8 @@ describe('buildVM and runInVM', () => { const bundle3 = path.resolve(__dirname, './fixtures/projects/bionicworkshop/fa6ccf6b/server-bundle.js'); // Create initial VMs - await buildVM(bundle1); - await buildVM(bundle2); + await buildExecutionContext([bundle1], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); // Wait a bit to ensure timestamp difference await new Promise((resolve) => { @@ -644,10 +676,10 @@ describe('buildVM and runInVM', () => { }); // Access bundle1 again to update its timestamp - await buildVM(bundle1); + await buildExecutionContext([bundle1], /* buildVmsIfNeeded */ true); // Add a new VM - should remove bundle2 as it's the oldest - await buildVM(bundle3); + await buildExecutionContext([bundle3], /* buildVmsIfNeeded */ true); // Bundle1 should still exist as it was accessed more recently expect(hasVMContextForBundle(bundle1)).toBeTruthy(); @@ -667,8 +699,8 @@ describe('buildVM and runInVM', () => { const bundle3 = path.resolve(__dirname, './fixtures/projects/bionicworkshop/fa6ccf6b/server-bundle.js'); // Create initial VMs - await buildVM(bundle1); - await buildVM(bundle2); + const { runInVM } = await buildExecutionContext([bundle1], /* buildVmsIfNeeded */ true); + await buildExecutionContext([bundle2], /* buildVmsIfNeeded */ true); // Wait a bit to ensure timestamp difference await new Promise((resolve) => { @@ -679,7 +711,7 @@ describe('buildVM and runInVM', () => { await runInVM('1 + 1', bundle1); // Add a new VM - should remove bundle2 as it's the oldest - await buildVM(bundle3); + await buildExecutionContext([bundle3], /* buildVmsIfNeeded */ true); // Bundle1 should still exist as it was used more recently expect(hasVMContextForBundle(bundle1)).toBeTruthy(); @@ -694,16 +726,16 @@ describe('buildVM and runInVM', () => { ); // Build VM first time - await buildVM(bundle); + const { runInVM } = await buildExecutionContext([bundle], /* buildVmsIfNeeded */ true); // Set a variable in the VM context await runInVM('global.testVar = "test value"', bundle); // Build VM second time - should reuse existing context - await buildVM(bundle); + const { runInVM: runInVM2 } = await buildExecutionContext([bundle], /* buildVmsIfNeeded */ true); // Variable should still exist if context was reused - const result = await runInVM('global.testVar', bundle); + const result = await runInVM2('global.testVar', bundle); expect(result).toBe('test value'); }); }); diff --git a/react_on_rails_pro/packages/node-renderer/tests/worker.test.ts b/react_on_rails_pro/packages/node-renderer/tests/worker.test.ts index 8f52ab1d0a..4d9cd05800 100644 --- a/react_on_rails_pro/packages/node-renderer/tests/worker.test.ts +++ b/react_on_rails_pro/packages/node-renderer/tests/worker.test.ts @@ -1,5 +1,6 @@ import formAutoContent from 'form-auto-content'; import fs from 'fs'; +import path from 'path'; import querystring from 'querystring'; import { createReadStream } from 'fs-extra'; import worker, { disableHttp2 } from '../src/worker'; @@ -29,6 +30,15 @@ const { protocolVersion } = packageJson; disableHttp2(); +// Helper to create worker with standard options +const createWorker = (options: Parameters[0] = {}) => + worker({ + bundlePath: bundlePathForTest(), + supportModules: true, + stubTimers: false, + ...options, + }); + describe('worker', () => { beforeEach(async () => { await resetForTest(testName); @@ -39,9 +49,7 @@ describe('worker', () => { }); test('POST /bundles/:bundleTimestamp/render/:renderRequestDigest when bundle is provided and did not yet exist', async () => { - const app = worker({ - bundlePath: bundlePathForTest(), - }); + const app = createWorker(); const form = formAutoContent({ gemVersion, @@ -66,9 +74,7 @@ describe('worker', () => { }); test('POST /bundles/:bundleTimestamp/render/:renderRequestDigest', async () => { - const app = worker({ - bundlePath: bundlePathForTest(), - }); + const app = createWorker(); const form = formAutoContent({ gemVersion, @@ -101,8 +107,7 @@ describe('worker', () => { async () => { await createVmBundleForTest(); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'password', }); @@ -127,8 +132,7 @@ describe('worker', () => { async () => { await createVmBundleForTest(); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'password', }); @@ -153,8 +157,7 @@ describe('worker', () => { async () => { await createVmBundleForTest(); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -180,9 +183,7 @@ describe('worker', () => { async () => { await createVmBundleForTest(); - const app = worker({ - bundlePath: bundlePathForTest(), - }); + const app = createWorker(); const res = await app .inject() @@ -203,8 +204,7 @@ describe('worker', () => { const bundleHash = 'some-bundle-hash'; await createAsset(testName, bundleHash); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -229,8 +229,7 @@ describe('worker', () => { const bundleHash = 'some-bundle-hash'; await createAsset(testName, bundleHash); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -253,8 +252,7 @@ describe('worker', () => { test('post /asset-exists requires targetBundles (protocol version 2.0.0)', async () => { await createAsset(testName, String(BUNDLE_TIMESTAMP)); - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -275,8 +273,7 @@ describe('worker', () => { test('post /upload-assets', async () => { const bundleHash = 'some-bundle-hash'; - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -298,8 +295,7 @@ describe('worker', () => { const bundleHash = 'some-bundle-hash'; const bundleHashOther = 'some-other-bundle-hash'; - const app = worker({ - bundlePath: bundlePathForTest(), + const app = createWorker({ password: 'my_password', }); @@ -319,4 +315,614 @@ describe('worker', () => { expect(fs.existsSync(assetPath(testName, bundleHashOther))).toBe(true); expect(fs.existsSync(assetPathOther(testName, bundleHashOther))).toBe(true); }); + + test('post /upload-assets with bundles and assets', async () => { + const bundleHash = 'some-bundle-hash'; + const secondaryBundleHash = 'secondary-bundle-hash'; + + const app = createWorker({ + password: 'my_password', + }); + + const form = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [bundleHash, secondaryBundleHash], + [`bundle_${bundleHash}`]: createReadStream(getFixtureBundle()), + [`bundle_${secondaryBundleHash}`]: createReadStream(getFixtureSecondaryBundle()), + asset1: createReadStream(getFixtureAsset()), + asset2: createReadStream(getOtherFixtureAsset()), + }); + + const res = await app.inject().post(`/upload-assets`).payload(form.payload).headers(form.headers).end(); + expect(res.statusCode).toBe(200); + + // Verify assets are copied to both bundle directories + expect(fs.existsSync(assetPath(testName, bundleHash))).toBe(true); + expect(fs.existsSync(assetPathOther(testName, bundleHash))).toBe(true); + expect(fs.existsSync(assetPath(testName, secondaryBundleHash))).toBe(true); + expect(fs.existsSync(assetPathOther(testName, secondaryBundleHash))).toBe(true); + + // Verify bundles are placed in their correct directories + const bundle1Path = path.join(bundlePathForTest(), bundleHash, `${bundleHash}.js`); + const bundle2Path = path.join(bundlePathForTest(), secondaryBundleHash, `${secondaryBundleHash}.js`); + expect(fs.existsSync(bundle1Path)).toBe(true); + expect(fs.existsSync(bundle2Path)).toBe(true); + + // Verify the directory structure is correct + const bundle1Dir = path.join(bundlePathForTest(), bundleHash); + const bundle2Dir = path.join(bundlePathForTest(), secondaryBundleHash); + + // Each bundle directory should contain: 1 bundle file + 2 assets = 3 files total + const bundle1Files = fs.readdirSync(bundle1Dir); + const bundle2Files = fs.readdirSync(bundle2Dir); + + expect(bundle1Files).toHaveLength(3); // bundle file + 2 assets + expect(bundle2Files).toHaveLength(3); // bundle file + 2 assets + + // Verify the specific files exist in each directory + expect(bundle1Files).toContain(`${bundleHash}.js`); + expect(bundle1Files).toContain('loadable-stats.json'); + expect(bundle1Files).toContain('loadable-stats-other.json'); + + expect(bundle2Files).toContain(`${secondaryBundleHash}.js`); + expect(bundle2Files).toContain('loadable-stats.json'); + expect(bundle2Files).toContain('loadable-stats-other.json'); + }); + + test('post /upload-assets with only bundles (no assets)', async () => { + const bundleHash = 'bundle-only-hash'; + + const app = createWorker({ + password: 'my_password', + }); + + const form = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [bundleHash], + [`bundle_${bundleHash}`]: createReadStream(getFixtureBundle()), + }); + + const res = await app.inject().post(`/upload-assets`).payload(form.payload).headers(form.headers).end(); + expect(res.statusCode).toBe(200); + + // Verify bundle is placed in the correct directory + const bundleFilePath = path.join(bundlePathForTest(), bundleHash, `${bundleHash}.js`); + expect(fs.existsSync(bundleFilePath)).toBe(true); + + // Verify the directory structure is correct + const bundleDir = path.join(bundlePathForTest(), bundleHash); + const files = fs.readdirSync(bundleDir); + + // Should only contain the bundle file, no assets + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${bundleHash}.js`); + + // Verify no asset files were accidentally copied + expect(files).not.toContain('loadable-stats.json'); + expect(files).not.toContain('loadable-stats-other.json'); + }); + + test('post /upload-assets with no assets and no bundles (empty request)', async () => { + const bundleHash = 'empty-request-hash'; + + const app = createWorker({ + password: 'my_password', + }); + + const form = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [bundleHash], + // No assets or bundles uploaded + }); + + const res = await app.inject().post(`/upload-assets`).payload(form.payload).headers(form.headers).end(); + expect(res.statusCode).toBe(200); + + // Verify bundle directory is created + const bundleDirectory = path.join(bundlePathForTest(), bundleHash); + expect(fs.existsSync(bundleDirectory)).toBe(true); + + // Verify no files were copied (since none were uploaded) + const files = fs.readdirSync(bundleDirectory); + expect(files).toHaveLength(0); + }); + + test('post /upload-assets with duplicate bundle hash silently skips overwrite and returns 200', async () => { + const bundleHash = 'duplicate-bundle-hash'; + + const app = createWorker({ + password: 'my_password', + }); + + // First upload with bundle + const form1 = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [bundleHash], + [`bundle_${bundleHash}`]: createReadStream(getFixtureBundle()), + }); + + const res1 = await app + .inject() + .post(`/upload-assets`) + .payload(form1.payload) + .headers(form1.headers) + .end(); + expect(res1.statusCode).toBe(200); + expect(res1.body).toBe(''); // Empty body on success + + // Verify first bundle was created correctly + const bundleDir = path.join(bundlePathForTest(), bundleHash); + expect(fs.existsSync(bundleDir)).toBe(true); + const bundleFilePath = path.join(bundleDir, `${bundleHash}.js`); + expect(fs.existsSync(bundleFilePath)).toBe(true); + + // Get file stats to verify it's the first bundle + const firstBundleStats = fs.statSync(bundleFilePath); + const firstBundleSize = firstBundleStats.size; + const firstBundleModTime = firstBundleStats.mtime.getTime(); + + // Second upload with the same bundle hash but different content + // This logs: "File exists when trying to overwrite bundle... Assuming bundle written by other thread" + // Then silently skips the overwrite operation and returns 200 success + const form2 = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [bundleHash], + [`bundle_${bundleHash}`]: createReadStream(getFixtureSecondaryBundle()), // Different content + }); + + const res2 = await app + .inject() + .post(`/upload-assets`) + .payload(form2.payload) + .headers(form2.headers) + .end(); + expect(res2.statusCode).toBe(200); // Still returns 200 success (no error) + expect(res2.body).toBe(''); // Empty body, no error message returned to client + + // Verify the bundle directory still exists + expect(fs.existsSync(bundleDir)).toBe(true); + + // Verify the bundle file still exists + expect(fs.existsSync(bundleFilePath)).toBe(true); + + // Verify the file was NOT overwritten (original bundle is preserved) + const secondBundleStats = fs.statSync(bundleFilePath); + const secondBundleSize = secondBundleStats.size; + const secondBundleModTime = secondBundleStats.mtime.getTime(); + + // The file size should be the same as the first upload (no overwrite occurred) + expect(secondBundleSize).toBe(firstBundleSize); + + // The modification time should be the same (file wasn't touched) + expect(secondBundleModTime).toBe(firstBundleModTime); + + // Verify the directory only contains one file (the original bundle) + const files = fs.readdirSync(bundleDir); + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${bundleHash}.js`); + + // Verify the original content is preserved (1646 bytes from bundle.js, not 1689 from secondary-bundle.js) + expect(secondBundleSize).toBe(1646); // Size of getFixtureBundle(), not getFixtureSecondaryBundle() + }); + + test('post /upload-assets with bundles placed in their own hash directories, not targetBundles directories', async () => { + const bundleHash = 'actual-bundle-hash'; + const targetBundleHash = 'target-bundle-hash'; // Different from actual bundle hash + + const app = createWorker({ + password: 'my_password', + }); + + const form = formAutoContent({ + gemVersion, + protocolVersion, + password: 'my_password', + targetBundles: [targetBundleHash], // This should NOT affect where the bundle is placed + [`bundle_${bundleHash}`]: createReadStream(getFixtureBundle()), // Bundle with its own hash + }); + + const res = await app.inject().post(`/upload-assets`).payload(form.payload).headers(form.headers).end(); + expect(res.statusCode).toBe(200); + + // Verify the bundle was placed in its OWN hash directory, not the targetBundles directory + const actualBundleDir = path.join(bundlePathForTest(), bundleHash); + const targetBundleDir = path.join(bundlePathForTest(), targetBundleHash); + + // Bundle should exist in its own hash directory + expect(fs.existsSync(actualBundleDir)).toBe(true); + const bundleFilePath = path.join(actualBundleDir, `${bundleHash}.js`); + expect(fs.existsSync(bundleFilePath)).toBe(true); + + // Target bundle directory should also exist (created for assets) + expect(fs.existsSync(targetBundleDir)).toBe(true); + + // But the bundle file should NOT be in the target bundle directory + const targetBundleFilePath = path.join(targetBundleDir, `${bundleHash}.js`); + expect(fs.existsSync(targetBundleFilePath)).toBe(false); + + // Verify the bundle is in the correct location with correct name + const files = fs.readdirSync(actualBundleDir); + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${bundleHash}.js`); + + // Verify the target bundle directory is empty (no assets uploaded) + const targetFiles = fs.readdirSync(targetBundleDir); + expect(targetFiles).toHaveLength(0); + }); + + // Incremental Render Endpoint Tests + describe('incremental render endpoint', () => { + // Helper functions to reduce code duplication + const createWorkerApp = (password = 'my_password') => + createWorker({ + password, + }); + + const uploadBundle = async ( + app: ReturnType, + bundleTimestamp = BUNDLE_TIMESTAMP, + password = 'my_password', + ) => { + const uploadForm = formAutoContent({ + gemVersion, + protocolVersion, + password, + targetBundles: [String(bundleTimestamp)], + [`bundle_${bundleTimestamp}`]: createReadStream(getFixtureBundle()), + }); + + const uploadRes = await app + .inject() + .post('/upload-assets') + .payload(uploadForm.payload) + .headers(uploadForm.headers) + .end(); + + expect(uploadRes.statusCode).toBe(200); + return uploadRes; + }; + + const uploadMultipleBundles = async ( + app: ReturnType, + bundleTimestamps: number[], + password = 'my_password', + ) => { + const uploadForm = formAutoContent({ + gemVersion, + protocolVersion, + password, + targetBundles: bundleTimestamps.map(String), + [`bundle_${bundleTimestamps[0]}`]: createReadStream(getFixtureBundle()), + [`bundle_${bundleTimestamps[1]}`]: createReadStream(getFixtureSecondaryBundle()), + }); + + const uploadRes = await app + .inject() + .post('/upload-assets') + .payload(uploadForm.payload) + .headers(uploadForm.headers) + .end(); + + expect(uploadRes.statusCode).toBe(200); + return uploadRes; + }; + + const createNDJSONPayload = (data: Record) => `${JSON.stringify(data)}\n`; + + const callIncrementalRender = async ( + app: ReturnType, + bundleTimestamp: number, + renderRequestDigest: string, + payload: Record, + expectedStatus = 200, + ) => { + const res = await app + .inject() + .post(`/bundles/${bundleTimestamp}/incremental-render/${renderRequestDigest}`) + .payload(createNDJSONPayload(payload)) + .headers({ + 'Content-Type': 'application/x-ndjson', + }) + .end(); + + expect(res.statusCode).toBe(expectedStatus); + return res; + }; + + test('renders successfully when bundle and assets are pre-uploaded', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const payload = { + gemVersion, + protocolVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + ); + + expect(res.headers['cache-control']).toBe('public, max-age=31536000'); + expect(res.payload).toBe('{"html":"Dummy Object"}'); + }); + + test('renders successfully with multiple dependency bundles', async () => { + const app = createWorkerApp(); + await uploadMultipleBundles(app, [BUNDLE_TIMESTAMP, SECONDARY_BUNDLE_TIMESTAMP]); + + // Test that we can render from the main bundle and call code from the secondary bundle + const payload = { + gemVersion, + protocolVersion, + password: 'my_password', + renderingRequest: ` + runOnOtherBundle(${SECONDARY_BUNDLE_TIMESTAMP}, 'ReactOnRails.dummy').then((secondaryBundleResult) => ({ + mainBundleResult: ReactOnRails.dummy, + secondaryBundleResult: JSON.parse(secondaryBundleResult), + })); + `, + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP), String(SECONDARY_BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + ); + + expect(res.headers['cache-control']).toBe('public, max-age=31536000'); + expect(res.payload).toBe( + '{"mainBundleResult":{"html":"Dummy Object"},"secondaryBundleResult":{"html":"Dummy Object from secondary bundle"}}', + ); + }); + + test('fails when bundle is not pre-uploaded', async () => { + const app = createWorkerApp(); + + const payload = { + gemVersion, + protocolVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + 410, + ); + + expect(res.payload).toContain('No bundle uploaded'); + }); + + test('fails with invalid JSON in first chunk', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const res = await app + .inject() + .post(`/bundles/${BUNDLE_TIMESTAMP}/incremental-render/d41d8cd98f00b204e9800998ecf8427e`) + .payload('invalid json\n') + .headers({ + 'Content-Type': 'application/x-ndjson', + }) + .end(); + + expect(res.statusCode).toBe(400); + expect(res.payload).toContain('Invalid JSON chunk'); + }); + + test('fails with missing required fields in first chunk', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const incompletePayload = { + gemVersion, + protocolVersion, + password: 'my_password', + // Missing renderingRequest + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + incompletePayload, + 400, + ); + + expect(res.payload).toContain('INVALID NIL or NULL result for rendering'); + }); + + test('fails when password is missing', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const payload = { + gemVersion, + protocolVersion, + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + 401, + ); + + expect(res.payload).toBe('Wrong password'); + }); + + test('fails when password is wrong', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const payload = { + gemVersion, + protocolVersion, + password: 'wrong_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + 401, + ); + + expect(res.payload).toBe('Wrong password'); + }); + + test('succeeds when password is required and correct password is provided', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const payload = { + gemVersion, + protocolVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + ); + + expect(res.statusCode).toBe(200); + expect(res.headers['cache-control']).toBe('public, max-age=31536000'); + expect(res.payload).toBe('{"html":"Dummy Object"}'); + }); + + test('fails when protocol version is missing', async () => { + const app = createWorkerApp(); + + // Upload bundle first + const uploadForm = formAutoContent({ + gemVersion, + password: 'my_password', + targetBundles: [String(BUNDLE_TIMESTAMP)], + [`bundle_${BUNDLE_TIMESTAMP}`]: createReadStream(getFixtureBundle()), + }); + + const uploadRes = await app + .inject() + .post('/upload-assets') + .payload(uploadForm.payload) + .headers(uploadForm.headers) + .end(); + expect(uploadRes.statusCode).toBe(412); + + // Try incremental render without protocol version + const payload = { + gemVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + 412, + ); + + expect(res.payload).toContain('Unsupported renderer protocol version MISSING'); + }); + + test('succeeds when gem version is missing', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + const payload = { + protocolVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }; + + const res = await callIncrementalRender( + app, + BUNDLE_TIMESTAMP, + 'd41d8cd98f00b204e9800998ecf8427e', + payload, + ); + + expect(res.headers['cache-control']).toBe('public, max-age=31536000'); + expect(res.payload).toBe('{"html":"Dummy Object"}'); + }); + + // TODO: Implement incremental updates and update this test + test('handles multiple NDJSON chunks but only processes first one for now', async () => { + const app = createWorkerApp(); + await uploadBundle(app); + + // Send multiple NDJSON chunks (only first one should be processed for now) + const firstChunk = createNDJSONPayload({ + gemVersion, + protocolVersion, + password: 'my_password', + renderingRequest: 'ReactOnRails.dummy', + dependencyBundleTimestamps: [String(BUNDLE_TIMESTAMP)], + }); + + const secondChunk = createNDJSONPayload({ + update: 'data', + timestamp: Date.now(), + }); + + const thirdChunk = createNDJSONPayload({ + anotherUpdate: 'more data', + sequence: 2, + }); + + const multiChunkPayload = firstChunk + secondChunk + thirdChunk; + + const res = await app + .inject() + .post(`/bundles/${BUNDLE_TIMESTAMP}/incremental-render/d41d8cd98f00b204e9800998ecf8427e`) + .payload(multiChunkPayload) + .headers({ + 'Content-Type': 'application/x-ndjson', + }) + .end(); + + // Should succeed and only process the first chunk + expect(res.statusCode).toBe(200); + expect(res.headers['cache-control']).toBe('public, max-age=31536000'); + expect(res.payload).toBe('{"html":"Dummy Object"}'); + }); + }); });