diff --git a/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json b/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json new file mode 100644 index 00000000000..fa3539c5131 --- /dev/null +++ b/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "comment": "Add optional file-based transfer APIs (`tryDownloadCacheEntryToFileAsync`, `tryUploadCacheEntryFromFileAsync`) to `ICloudBuildCacheProvider`, allowing cache plugins to transfer cache entries directly to and from files on disk without buffering entire contents in memory. Implement in `@rushstack/rush-http-build-cache-plugin`, `@rushstack/rush-amazon-s3-build-cache-plugin`, and `@rushstack/rush-azure-storage-build-cache-plugin`. Gated behind the `useDirectFileTransfersForBuildCache` experiment.", + "type": "none", + "packageName": "@microsoft/rush" + } + ], + "packageName": "@microsoft/rush", + "email": "198982749+Copilot@users.noreply.github.com" +} \ No newline at end of file diff --git a/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md b/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md index e0d5fe032ad..05745fddbb8 100644 --- a/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md +++ b/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md @@ -15,6 +15,7 @@ import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; // @public export class AmazonS3Client { constructor(credentials: IAmazonS3Credentials | undefined, options: IAmazonS3BuildCacheProviderOptionsAdvanced, webClient: WebClient, terminal: ITerminal); + downloadObjectToFileAsync(objectName: string, localFilePath: string): Promise; // (undocumented) getObjectAsync(objectName: string): Promise; // (undocumented) @@ -25,6 +26,7 @@ export class AmazonS3Client { static tryDeserializeCredentials(credentialString: string | undefined): IAmazonS3Credentials | undefined; // (undocumented) uploadObjectAsync(objectName: string, objectBuffer: Buffer): Promise; + uploadObjectFromFileAsync(objectName: string, localFilePath: string): Promise; // (undocumented) static UriEncode(input: string): string; } diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index 24c5a6606ac..6affccf1a5e 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -312,7 +312,7 @@ export class ExperimentsConfiguration { // @beta export class FileSystemBuildCacheProvider { constructor(options: IFileSystemBuildCacheProviderOptions); - getCacheEntryPath(cacheId: string): string; + readonly getCacheEntryPath: (cacheId: string) => string; tryGetCacheEntryPathByIdAsync(terminal: ITerminal, cacheId: string): Promise; trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; } @@ -345,10 +345,12 @@ export interface ICloudBuildCacheProvider { deleteCachedCredentialsAsync(terminal: ITerminal): Promise; // (undocumented) readonly isCacheWriteAllowed: boolean; + tryDownloadCacheEntryToFileAsync?(terminal: ITerminal, cacheId: string, localFilePath: string): Promise; // (undocumented) tryGetCacheEntryBufferByIdAsync(terminal: ITerminal, cacheId: string): Promise; // (undocumented) trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; + tryUploadCacheEntryFromFileAsync?(terminal: ITerminal, cacheId: string, localFilePath: string): Promise; // (undocumented) updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise; // (undocumented) @@ -482,6 +484,7 @@ export interface IExperimentsJson { printEventHooksOutputToConsole?: boolean; rushAlerts?: boolean; strictChangefileValidation?: boolean; + useDirectFileTransfersForBuildCache?: boolean; useIPCScriptsInWatchMode?: boolean; usePnpmFrozenLockfileForRushInstall?: boolean; usePnpmLockfileOnlyThenFrozenLockfileForRushUpdate?: boolean; @@ -595,6 +598,7 @@ export interface _IOperationBuildCacheOptions { buildCacheConfiguration: BuildCacheConfiguration; excludeAppleDoubleFiles: boolean; terminal: ITerminal; + useDirectFileTransfersForBuildCache: boolean; } // @alpha diff --git a/libraries/rush-lib/src/api/ExperimentsConfiguration.ts b/libraries/rush-lib/src/api/ExperimentsConfiguration.ts index 3d2c9416dc6..1b739878a73 100644 --- a/libraries/rush-lib/src/api/ExperimentsConfiguration.ts +++ b/libraries/rush-lib/src/api/ExperimentsConfiguration.ts @@ -144,6 +144,14 @@ export interface IExperimentsJson { * policy's main project. */ strictChangefileValidation?: boolean; + + /** + * If true, the build cache will use file-based APIs to transfer cache entries to and from cloud + * storage. This avoids loading the entire cache entry into memory, which can prevent out-of-memory + * errors for large build outputs. The cloud cache provider plugin must implement the optional + * file-based methods for this to take effect; otherwise it falls back to the buffer-based approach. + */ + useDirectFileTransfersForBuildCache?: boolean; } const _EXPERIMENTS_JSON_SCHEMA: JsonSchema = JsonSchema.fromLoadedObject(schemaJson); diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index fd41ff7ce7c..90b435d9095 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -350,6 +350,11 @@ export class PhasedScriptAction extends BaseScriptAction i public async runAsync(): Promise { const stopwatch: Stopwatch = Stopwatch.start(); + const { + defaultSubspace, + subspacesFeatureEnabled, + pnpmOptions: { useWorkspaces } + } = this.rushConfiguration; if (this._alwaysInstall || this._installParameter?.value) { await measureAsyncFn(`${PERF_PREFIX}:install`, async () => { const { doBasicInstallAsync } = await import( @@ -373,7 +378,7 @@ export class PhasedScriptAction extends BaseScriptAction i afterInstallAsync: (subspace: Subspace) => this.rushSession.hooks.afterInstall.promise(this, subspace, variant), // Eventually we may want to allow a subspace to be selected here - subspace: this.rushConfiguration.defaultSubspace + subspace: defaultSubspace }); }); } @@ -382,14 +387,12 @@ export class PhasedScriptAction extends BaseScriptAction i await measureAsyncFn(`${PERF_PREFIX}:checkInstallFlag`, async () => { // TODO: Replace with last-install.flag when "rush link" and "rush unlink" are removed const lastLinkFlag: FlagFile = new FlagFile( - this.rushConfiguration.defaultSubspace.getSubspaceTempFolderPath(), + defaultSubspace.getSubspaceTempFolderPath(), RushConstants.lastLinkFlagFilename, {} ); // Only check for a valid link flag when subspaces is not enabled - if (!(await lastLinkFlag.isValidAsync()) && !this.rushConfiguration.subspacesFeatureEnabled) { - const useWorkspaces: boolean = - this.rushConfiguration.pnpmOptions && this.rushConfiguration.pnpmOptions.useWorkspaces; + if (!(await lastLinkFlag.isValidAsync()) && !subspacesFeatureEnabled) { if (useWorkspaces) { throw new Error('Link flag invalid.\nDid you run "rush install" or "rush update"?'); } else { @@ -513,18 +516,27 @@ export class PhasedScriptAction extends BaseScriptAction i ).IPCOperationRunnerPlugin().apply(this.hooks); } + const { + experimentsConfiguration: { + configuration: { + buildCacheWithAllowWarningsInSuccessfulBuild = false, + buildSkipWithAllowWarningsInSuccessfulBuild, + omitAppleDoubleFilesFromBuildCache: excludeAppleDoubleFiles = false, + useDirectFileTransfersForBuildCache = false, + usePnpmSyncForInjectedDependencies + } + }, + isPnpm + } = this.rushConfiguration; if (buildCacheConfiguration?.buildCacheEnabled) { terminal.writeVerboseLine(`Incremental strategy: cache restoration`); new CacheableOperationPlugin({ - allowWarningsInSuccessfulBuild: - !!this.rushConfiguration.experimentsConfiguration.configuration - .buildCacheWithAllowWarningsInSuccessfulBuild, + allowWarningsInSuccessfulBuild: buildCacheWithAllowWarningsInSuccessfulBuild, buildCacheConfiguration, cobuildConfiguration, terminal, - excludeAppleDoubleFiles: - !!this.rushConfiguration.experimentsConfiguration.configuration - .omitAppleDoubleFilesFromBuildCache + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }).apply(this.hooks); if (this._debugBuildCacheIdsParameter.value) { @@ -534,9 +546,7 @@ export class PhasedScriptAction extends BaseScriptAction i terminal.writeVerboseLine(`Incremental strategy: output preservation`); // Explicitly disabling the build cache also disables legacy skip detection. new LegacySkipPlugin({ - allowWarningsInSuccessfulBuild: - this.rushConfiguration.experimentsConfiguration.configuration - .buildSkipWithAllowWarningsInSuccessfulBuild, + allowWarningsInSuccessfulBuild: buildSkipWithAllowWarningsInSuccessfulBuild, terminal, changedProjectsOnly, isIncrementalBuildAllowed: this._isIncrementalBuildAllowed @@ -551,12 +561,12 @@ export class PhasedScriptAction extends BaseScriptAction i if (!buildCacheConfiguration?.buildCacheEnabled) { throw new Error('You must have build cache enabled to use this option.'); } + const { BuildPlanPlugin } = await import('../../logic/operations/BuildPlanPlugin'); new BuildPlanPlugin(terminal).apply(this.hooks); } - const { configuration: experiments } = this.rushConfiguration.experimentsConfiguration; - if (this.rushConfiguration?.isPnpm && experiments?.usePnpmSyncForInjectedDependencies) { + if (isPnpm && usePnpmSyncForInjectedDependencies) { const { PnpmSyncCopyOperationPlugin } = await import( '../../logic/operations/PnpmSyncCopyOperationPlugin' ); diff --git a/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts b/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts index 8fdd54bf444..96af3eebb27 100644 --- a/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts +++ b/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. -import * as path from 'node:path'; - import { FileSystem } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; @@ -32,19 +30,17 @@ const DEFAULT_BUILD_CACHE_FOLDER_NAME: string = 'build-cache'; * @beta */ export class FileSystemBuildCacheProvider { - private readonly _cacheFolderPath: string; - - public constructor(options: IFileSystemBuildCacheProviderOptions) { - this._cacheFolderPath = - options.rushUserConfiguration.buildCacheFolder || - path.join(options.rushConfiguration.commonTempFolder, DEFAULT_BUILD_CACHE_FOLDER_NAME); - } - /** * Returns the absolute disk path for the specified cache id. */ - public getCacheEntryPath(cacheId: string): string { - return path.join(this._cacheFolderPath, cacheId); + public readonly getCacheEntryPath: (cacheId: string) => string; + + public constructor(options: IFileSystemBuildCacheProviderOptions) { + const { + rushConfiguration: { commonTempFolder }, + rushUserConfiguration: { buildCacheFolder = `${commonTempFolder}/${DEFAULT_BUILD_CACHE_FOLDER_NAME}` } + } = options; + this.getCacheEntryPath = (cacheId: string) => `${buildCacheFolder}/${cacheId}`; } /** @@ -55,7 +51,8 @@ export class FileSystemBuildCacheProvider { cacheId: string ): Promise { const cacheEntryFilePath: string = this.getCacheEntryPath(cacheId); - if (await FileSystem.existsAsync(cacheEntryFilePath)) { + const cacheEntryExists: boolean = await FileSystem.existsAsync(cacheEntryFilePath); + if (cacheEntryExists) { return cacheEntryFilePath; } else { return undefined; diff --git a/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts b/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts index f55a0870ad8..a10e1019282 100644 --- a/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts +++ b/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts @@ -11,6 +11,35 @@ export interface ICloudBuildCacheProvider { tryGetCacheEntryBufferByIdAsync(terminal: ITerminal, cacheId: string): Promise; trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; + + /** + * If implemented, the build cache will prefer to use this method over + * {@link ICloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync} to avoid loading the entire + * cache entry into memory, if possible. The implementation should download the cache entry and write it + * to the specified local file path. + * + * @returns `true` if the cache entry was found and written to the file, `false` if it was + * not found. Throws on errors. + */ + tryDownloadCacheEntryToFileAsync?( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise; + /** + * If implemented, the build cache will prefer to use this method over + * {@link ICloudBuildCacheProvider.trySetCacheEntryBufferAsync} to avoid loading the entire + * cache entry into memory, if possible. The implementation should read the cache entry from + * the specified local file path and upload it. + * + * @returns `true` if the cache entry was written to the cache, otherwise `false`. + */ + tryUploadCacheEntryFromFileAsync?( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise; + updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise; updateCachedCredentialInteractiveAsync(terminal: ITerminal): Promise; deleteCachedCredentialsAsync(terminal: ITerminal): Promise; diff --git a/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts b/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts index 0abf76c221b..417e05d67f6 100644 --- a/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts +++ b/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts @@ -32,6 +32,11 @@ export interface IOperationBuildCacheOptions { * and a companion file exists in the same directory. */ excludeAppleDoubleFiles: boolean; + /** + * If true, use file-based APIs (when available) to transfer cache entries to and from the + * cloud provider, avoiding buffering the entire entry in memory. + */ + useDirectFileTransfersForBuildCache: boolean; } /** @@ -75,6 +80,7 @@ export class OperationBuildCache { private readonly _projectOutputFolderNames: ReadonlyArray; private readonly _cacheId: string | undefined; private readonly _excludeAppleDoubleFiles: boolean; + private readonly _useDirectFileTransfersForBuildCache: boolean; private constructor(cacheId: string | undefined, options: IProjectBuildCacheOptions) { const { @@ -86,7 +92,8 @@ export class OperationBuildCache { }, project, projectOutputFolderNames, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = options; this._project = project; this._localBuildCacheProvider = localCacheProvider; @@ -96,6 +103,7 @@ export class OperationBuildCache { this._projectOutputFolderNames = projectOutputFolderNames || []; this._cacheId = cacheId; this._excludeAppleDoubleFiles = excludeAppleDoubleFiles && process.platform === 'darwin'; + this._useDirectFileTransfersForBuildCache = useDirectFileTransfersForBuildCache; } private static _tryGetTarUtility(terminal: ITerminal): Promise { @@ -119,7 +127,12 @@ export class OperationBuildCache { executionResult: IOperationExecutionResult, options: IOperationBuildCacheOptions ): OperationBuildCache { - const { buildCacheConfiguration, terminal, excludeAppleDoubleFiles } = options; + const { + buildCacheConfiguration, + terminal, + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache + } = options; const outputFolders: string[] = [...(executionResult.operation.settings?.outputFolderNames ?? [])]; if (executionResult.metadataFolderPath) { outputFolders.push(executionResult.metadataFolderPath); @@ -132,7 +145,8 @@ export class OperationBuildCache { phaseName: executionResult.operation.associatedPhase.name, projectOutputFolderNames: outputFolders, operationStateHash: executionResult.getStateHash(), - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }; const cacheId: string | undefined = OperationBuildCache._getCacheId(buildCacheOptions); return new OperationBuildCache(cacheId, buildCacheOptions); @@ -152,32 +166,66 @@ export class OperationBuildCache { let localCacheEntryPath: string | undefined = await this._localBuildCacheProvider.tryGetCacheEntryPathByIdAsync(terminal, cacheId); - let cacheEntryBuffer: Buffer | undefined; + let cloudCacheHit: boolean = false; let updateLocalCacheSuccess: boolean | undefined; if (!localCacheEntryPath && this._cloudBuildCacheProvider) { terminal.writeVerboseLine( 'This project was not found in the local build cache. Querying the cloud build cache.' ); - cacheEntryBuffer = await this._cloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync( - terminal, - cacheId - ); - if (cacheEntryBuffer) { + if ( + this._useDirectFileTransfersForBuildCache && + this._cloudBuildCacheProvider.tryDownloadCacheEntryToFileAsync + ) { + // Use file-based path to avoid loading the entire cache entry into memory. + // The provider downloads directly to the local cache file. + const targetPath: string = this._localBuildCacheProvider.getCacheEntryPath(cacheId); try { - localCacheEntryPath = await this._localBuildCacheProvider.trySetCacheEntryBufferAsync( + cloudCacheHit = await this._cloudBuildCacheProvider.tryDownloadCacheEntryToFileAsync( terminal, cacheId, - cacheEntryBuffer + targetPath ); - updateLocalCacheSuccess = true; + if (cloudCacheHit) { + localCacheEntryPath = targetPath; + updateLocalCacheSuccess = true; + } } catch (e) { + terminal.writeVerboseLine(`Failed to download cache entry to local cache: ${e}`); updateLocalCacheSuccess = false; } + + if (!cloudCacheHit) { + // Clean up any partial file left by the failed or missed download so it isn't + // mistaken for a valid cache entry on the next build. Providers may catch errors + // internally and return false instead of throwing, leaving a partially written file. + try { + await FileSystem.deleteFileAsync(targetPath); + } catch { + // Ignore cleanup errors (file may not have been created) + } + } + } else { + const cacheEntryBuffer: Buffer | undefined = + await this._cloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync(terminal, cacheId); + if (cacheEntryBuffer) { + cloudCacheHit = true; + try { + localCacheEntryPath = await this._localBuildCacheProvider.trySetCacheEntryBufferAsync( + terminal, + cacheId, + cacheEntryBuffer + ); + updateLocalCacheSuccess = true; + } catch (e) { + terminal.writeVerboseLine(`Failed to update local cache: ${e}`); + updateLocalCacheSuccess = false; + } + } } } - if (!localCacheEntryPath && !cacheEntryBuffer) { + if (!localCacheEntryPath && !cloudCacheHit) { terminal.writeVerboseLine('This project was not found in the build cache.'); return false; } @@ -300,8 +348,6 @@ export class OperationBuildCache { return false; } - let cacheEntryBuffer: Buffer | undefined; - let setCloudCacheEntryPromise: Promise | undefined; // Note that "writeAllowed" settings (whether in config or environment) always apply to @@ -309,17 +355,29 @@ export class OperationBuildCache { // write to the local build cache. if (this._cloudBuildCacheProvider?.isCacheWriteAllowed) { - if (localCacheEntryPath) { - cacheEntryBuffer = await FileSystem.readFileToBufferAsync(localCacheEntryPath); - } else { + if (!localCacheEntryPath) { throw new InternalError('Expected the local cache entry path to be set.'); } - setCloudCacheEntryPromise = this._cloudBuildCacheProvider?.trySetCacheEntryBufferAsync( - terminal, - cacheId, - cacheEntryBuffer - ); + if ( + this._useDirectFileTransfersForBuildCache && + this._cloudBuildCacheProvider.tryUploadCacheEntryFromFileAsync + ) { + // Use file-based upload to avoid loading the entire cache entry into memory. + // The provider reads from the local cache file directly. + setCloudCacheEntryPromise = this._cloudBuildCacheProvider.tryUploadCacheEntryFromFileAsync( + terminal, + cacheId, + localCacheEntryPath + ); + } else { + const cacheEntryBuffer: Buffer = await FileSystem.readFileToBufferAsync(localCacheEntryPath); + setCloudCacheEntryPromise = this._cloudBuildCacheProvider.trySetCacheEntryBufferAsync( + terminal, + cacheId, + cacheEntryBuffer + ); + } } const updateCloudCacheSuccess: boolean | undefined = (await setCloudCacheEntryPromise) ?? true; diff --git a/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts b/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts index 08a35a85428..c4f10daa36a 100644 --- a/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts +++ b/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts @@ -59,7 +59,8 @@ describe(OperationBuildCache.name, () => { operationStateHash: '1926f30e8ed24cb47be89aea39e7efd70fcda075', terminal, phaseName: 'build', - excludeAppleDoubleFiles: !!options.excludeAppleDoubleFiles + excludeAppleDoubleFiles: !!options.excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache: false }); return subject; diff --git a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts index 90af287b9dd..5e7468fd8d6 100644 --- a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts @@ -77,6 +77,7 @@ export interface ICacheableOperationPluginOptions { cobuildConfiguration: CobuildConfiguration | undefined; terminal: ITerminal; excludeAppleDoubleFiles: boolean; + useDirectFileTransfersForBuildCache: boolean; } interface ITryGetOperationBuildCacheOptionsBase { @@ -84,6 +85,7 @@ interface ITryGetOperationBuildCacheOptionsBase { buildCacheConfiguration: BuildCacheConfiguration | undefined; terminal: ITerminal; excludeAppleDoubleFiles: boolean; + useDirectFileTransfersForBuildCache: boolean; record: TRecord; } @@ -108,7 +110,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { allowWarningsInSuccessfulBuild, buildCacheConfiguration, cobuildConfiguration, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = this._options; hooks.beforeExecuteOperations.tap( @@ -272,7 +275,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheConfiguration, terminal: buildCacheTerminal, record, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); // Try to acquire the cobuild lock @@ -291,7 +295,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheContext, record, terminal: buildCacheTerminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); if (operationBuildCache) { buildCacheTerminal.writeVerboseLine( @@ -585,7 +590,14 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { private _tryGetOperationBuildCache( options: ITryGetOperationBuildCacheOptions ): OperationBuildCache | undefined { - const { buildCacheConfiguration, buildCacheContext, terminal, record, excludeAppleDoubleFiles } = options; + const { + buildCacheConfiguration, + buildCacheContext, + terminal, + record, + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache + } = options; if (!buildCacheContext.operationBuildCache) { const { cacheDisabledReason } = buildCacheContext; if (cacheDisabledReason && !record.operation.settings?.allowCobuildWithoutCache) { @@ -601,7 +613,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheContext.operationBuildCache = OperationBuildCache.forOperation(record, { buildCacheConfiguration, terminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); } @@ -618,7 +631,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { cobuildConfiguration, record, terminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = options; if (!buildCacheConfiguration?.buildCacheEnabled) { @@ -649,7 +663,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { terminal, operationStateHash, phaseName: associatedPhase.name, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); buildCacheContext.operationBuildCache = operationBuildCache; diff --git a/libraries/rush-lib/src/schemas/experiments.schema.json b/libraries/rush-lib/src/schemas/experiments.schema.json index dd508fcc1db..7df4221e26c 100644 --- a/libraries/rush-lib/src/schemas/experiments.schema.json +++ b/libraries/rush-lib/src/schemas/experiments.schema.json @@ -89,6 +89,10 @@ "strictChangefileValidation": { "description": "If true, `rush change --verify` will report errors if change files reference projects that do not exist in the Rush configuration, or if change files target a project that belongs to a lockstepped version policy but is not the policy's main project.", "type": "boolean" + }, + "useDirectFileTransfersForBuildCache": { + "description": "If true, the build cache will use file-based APIs to transfer cache entries to and from cloud storage. This avoids loading the entire cache entry into memory, which can prevent out-of-memory errors for large build outputs. The cloud cache provider plugin must implement the optional file-based methods for this to take effect; otherwise it falls back to the buffer-based approach.", + "type": "boolean" } }, "additionalProperties": false diff --git a/libraries/rush-lib/src/utilities/WebClient.ts b/libraries/rush-lib/src/utilities/WebClient.ts index bdd16823332..6b4e0e4140d 100644 --- a/libraries/rush-lib/src/utilities/WebClient.ts +++ b/libraries/rush-lib/src/utilities/WebClient.ts @@ -3,27 +3,44 @@ import * as os from 'node:os'; import * as process from 'node:process'; -import { request as httpRequest, type IncomingMessage, type Agent as HttpAgent } from 'node:http'; +import type { Readable } from 'node:stream'; +import { + request as httpRequest, + type IncomingMessage, + type ClientRequest, + type Agent as HttpAgent +} from 'node:http'; import { request as httpsRequest, type RequestOptions } from 'node:https'; import { Import, LegacyAdapters } from '@rushstack/node-core-library'; const createHttpsProxyAgent: typeof import('https-proxy-agent') = Import.lazy('https-proxy-agent', require); -/** - * For use with {@link WebClient}. - */ -export interface IWebClientResponse { +export interface IWebClientResponseBase { ok: boolean; status: number; statusText?: string; redirected: boolean; headers: Record; +} + +/** + * A response from {@link WebClient.fetchAsync}. + */ +export interface IWebClientResponse extends IWebClientResponseBase { getTextAsync: () => Promise; getJsonAsync: () => Promise; getBufferAsync: () => Promise; } +/** + * A response from {@link WebClient.fetchStreamAsync} that provides the response body as a + * readable stream, avoiding buffering the entire response in memory. + */ +export interface IWebClientStreamResponse extends IWebClientResponseBase { + stream: Readable; +} + /** * For use with {@link WebClient}. */ @@ -49,7 +66,7 @@ export interface IGetFetchOptions extends IWebFetchOptionsBase { */ export interface IFetchOptionsWithBody extends IWebFetchOptionsBase { verb: 'PUT' | 'POST' | 'PATCH'; - body?: Buffer; + body?: Buffer | Readable; } /** @@ -78,140 +95,350 @@ const ACCEPT_HEADER_NAME: 'accept' = 'accept'; const USER_AGENT_HEADER_NAME: 'user-agent' = 'user-agent'; const CONTENT_ENCODING_HEADER_NAME: 'content-encoding' = 'content-encoding'; -const makeRequestAsync: FetchFn = async ( +/** + * Parses the Content-Encoding header into an array of encoding names, + * or returns `undefined` if decoding should be skipped. + */ +function _getContentEncodings( + headers: Record, + noDecode: boolean | undefined +): string[] | undefined { + if (!noDecode) { + const encodings: string | string[] | undefined = headers[CONTENT_ENCODING_HEADER_NAME]; + if (encodings) { + return Array.isArray(encodings) ? encodings : encodings.split(','); + } + } +} + +type StreamFetchFn = ( url: string, options: IRequestOptions, - redirected: boolean = false -) => { - const { body, redirect, noDecode } = options; + isRedirect?: boolean +) => Promise; - return await new Promise( - (resolve: (result: IWebClientResponse) => void, reject: (error: Error) => void) => { +/** + * Shared HTTP request core used by both buffer-based and streaming request functions. + * Handles URL parsing, protocol selection, redirect following, body sending, and error handling. + * The `handleResponse` callback is responsible for processing the response and calling + * `resolve`/`reject` to complete the outer promise. + */ +function _makeRawRequestAsync( + url: string, + options: IRequestOptions, + redirected: boolean, + handleResponse: ( + response: IncomingMessage, + redirected: boolean, + resolve: (result: TResponse | PromiseLike) => void, + reject: (error: Error) => void + ) => void, + requestFnAsync: (url: string, options: IRequestOptions, isRedirect?: boolean) => Promise +): Promise { + const { body, redirect } = options; + + return new Promise( + (resolve: (result: TResponse | PromiseLike) => void, reject: (error: Error) => void) => { const parsedUrl: URL = typeof url === 'string' ? new URL(url) : url; const requestFunction: typeof httpRequest | typeof httpsRequest = parsedUrl.protocol === 'https:' ? httpsRequest : httpRequest; - requestFunction(url, options, (response: IncomingMessage) => { - const responseBuffers: (Buffer | Uint8Array)[] = []; - response.on('data', (chunk: string | Buffer | Uint8Array) => { - responseBuffers.push(Buffer.from(chunk)); - }); - response.on('end', () => { - // Handle retries by calling the method recursively with the redirect URL - const statusCode: number | undefined = response.statusCode; - if (statusCode === 301 || statusCode === 302) { - switch (redirect) { - case 'follow': { - const redirectUrl: string | string[] | undefined = response.headers.location; - if (redirectUrl) { - makeRequestAsync(redirectUrl, options, true).then(resolve).catch(reject); - } else { - reject( - new Error(`Received status code ${response.statusCode} with no location header: ${url}`) - ); - } - - break; + const req: ClientRequest = requestFunction(url, options, (response: IncomingMessage) => { + const { + statusCode, + headers: { location: redirectUrl } + } = response; + if (statusCode === 301 || statusCode === 302) { + switch (redirect) { + case 'follow': { + // Drain the redirect response since we're discarding it + response.resume(); + if (redirectUrl) { + requestFnAsync(redirectUrl, options, true).then(resolve).catch(reject); + } else { + reject(new Error(`Received status code ${statusCode} with no location header: ${url}`)); } - case 'error': - reject(new Error(`Received status code ${response.statusCode}: ${url}`)); - return; + return; } + + case 'error': + response.resume(); + reject(new Error(`Received status code ${statusCode}: ${url}`)); + return; } + } - const responseData: Buffer = Buffer.concat(responseBuffers); - const status: number = response.statusCode || 0; - const statusText: string | undefined = response.statusMessage; - const headers: Record = response.headers; - - let bodyString: string | undefined; - let bodyJson: unknown | undefined; - let decodedBuffer: Buffer | undefined; - const result: IWebClientResponse = { - ok: status >= 200 && status < 300, - status, - statusText, - redirected, - headers, - getTextAsync: async () => { - if (bodyString === undefined) { - const buffer: Buffer = await result.getBufferAsync(); - // eslint-disable-next-line require-atomic-updates - bodyString = buffer.toString(); - } + handleResponse(response, redirected, resolve, reject); + }).on('error', (error: Error) => { + reject(error); + }); + + const isStream: boolean = !!body && typeof (body as Readable).pipe === 'function'; + if (isStream) { + (body as Readable).on('error', reject); + (body as Readable).pipe(req); + } else { + req.end(body as Buffer | undefined); + } + } + ); +} - return bodyString; - }, - getJsonAsync: async () => { - if (bodyJson === undefined) { - const text: string = await result.getTextAsync(); - // eslint-disable-next-line require-atomic-updates - bodyJson = JSON.parse(text); - } +const makeRequestAsync: FetchFn = async ( + url: string, + options: IRequestOptions, + redirected: boolean = false +) => { + const { noDecode } = options; + + return _makeRawRequestAsync( + url, + options, + redirected, + ( + response: IncomingMessage, + wasRedirected: boolean, + resolve: (result: IWebClientResponse | PromiseLike) => void + ): void => { + const responseBuffers: (Buffer | Uint8Array)[] = []; + response.on('data', (chunk: string | Buffer | Uint8Array) => { + responseBuffers.push(Buffer.from(chunk)); + }); + response.on('end', () => { + const { statusCode: status = 0, statusMessage: statusText, headers } = response; + const responseData: Buffer = Buffer.concat(responseBuffers); + + let bodyString: string | undefined; + let bodyJson: unknown | undefined; + let decodedBuffer: Buffer | undefined; + const result: IWebClientResponse = { + ok: status >= 200 && status < 300, + status, + statusText, + redirected: wasRedirected, + headers, + getTextAsync: async () => { + if (bodyString === undefined) { + const buffer: Buffer = await result.getBufferAsync(); + // eslint-disable-next-line require-atomic-updates + bodyString = buffer.toString(); + } - return bodyJson as TJson; - }, - getBufferAsync: async () => { - // Determine if the buffer is compressed and decode it if necessary - if (decodedBuffer === undefined) { - let encodings: string | string[] | undefined = headers[CONTENT_ENCODING_HEADER_NAME]; - if (!noDecode && encodings !== undefined) { - const zlib: typeof import('zlib') = await import('node:zlib'); - if (!Array.isArray(encodings)) { - encodings = encodings.split(','); - } + return bodyString; + }, + getJsonAsync: async () => { + if (bodyJson === undefined) { + const text: string = await result.getTextAsync(); + // eslint-disable-next-line require-atomic-updates + bodyJson = JSON.parse(text); + } - let buffer: Buffer = responseData; - for (const encoding of encodings) { - let decompressFn: (buffer: Buffer, callback: import('zlib').CompressCallback) => void; - switch (encoding.trim()) { - case DEFLATE_ENCODING: { - decompressFn = zlib.inflate.bind(zlib); - break; - } - case GZIP_ENCODING: { - decompressFn = zlib.gunzip.bind(zlib); - break; - } - case BROTLI_ENCODING: { - decompressFn = zlib.brotliDecompress.bind(zlib); - break; - } - default: { - throw new Error(`Unsupported content-encoding: ${encodings}`); - } + return bodyJson as TJson; + }, + getBufferAsync: async () => { + // Determine if the buffer is compressed and decode it if necessary + if (decodedBuffer === undefined) { + const contentEncodings: string[] | undefined = _getContentEncodings(headers, noDecode); + if (contentEncodings) { + const zlib: typeof import('zlib') = await import('node:zlib'); + + let buffer: Buffer = responseData; + for (const encoding of contentEncodings) { + let decompressFn: (buffer: Buffer, callback: import('zlib').CompressCallback) => void; + switch (encoding.trim()) { + case DEFLATE_ENCODING: { + decompressFn = zlib.inflate.bind(zlib); + break; + } + case GZIP_ENCODING: { + decompressFn = zlib.gunzip.bind(zlib); + break; + } + case BROTLI_ENCODING: { + decompressFn = zlib.brotliDecompress.bind(zlib); + break; + } + default: { + throw new Error(`Unsupported content-encoding: ${encoding.trim()}`); } - - buffer = await LegacyAdapters.convertCallbackToPromise(decompressFn, buffer); } - // eslint-disable-next-line require-atomic-updates - decodedBuffer = buffer; - } else { - decodedBuffer = responseData; + buffer = await LegacyAdapters.convertCallbackToPromise(decompressFn, buffer); } + + // eslint-disable-next-line require-atomic-updates + decodedBuffer = buffer; + } else { + decodedBuffer = responseData; } + } + + return decodedBuffer; + } + }; + resolve(result); + }); + }, + makeRequestAsync + ); +}; - return decodedBuffer; +const makeStreamRequestAsync: StreamFetchFn = async ( + url: string, + options: IRequestOptions, + redirected: boolean = false +) => { + const { noDecode } = options; + + return _makeRawRequestAsync( + url, + options, + redirected, + ( + response: IncomingMessage, + wasRedirected: boolean, + resolve: (result: IWebClientStreamResponse | PromiseLike) => void + ): void => { + const { statusCode: status = 0, statusMessage: statusText, headers } = response; + + const buildResult = (stream: Readable): IWebClientStreamResponse => ({ + ok: status >= 200 && status < 300, + status, + statusText, + redirected: wasRedirected, + headers, + stream + }); + + // Handle Content-Encoding decompression for streaming responses, + // matching the buffer-based path's behavior in getBufferAsync() + const contentEncodings: string[] | undefined = _getContentEncodings(headers, noDecode); + + if (contentEncodings) { + // Resolve with a promise so we can lazily import zlib (same pattern as buffer path) + resolve( + (async () => { + const zlib: typeof import('zlib') = await import('node:zlib'); + + let resultStream: Readable = response; + for (const encoding of contentEncodings) { + switch (encoding.trim()) { + case DEFLATE_ENCODING: { + resultStream = resultStream.pipe(zlib.createInflate()); + break; + } + case GZIP_ENCODING: { + resultStream = resultStream.pipe(zlib.createGunzip()); + break; + } + case BROTLI_ENCODING: { + resultStream = resultStream.pipe(zlib.createBrotliDecompress()); + break; + } + default: { + throw new Error(`Unsupported content-encoding: ${encoding.trim()}`); + } + } } - }; - resolve(result); - }); - }) - .on('error', (error: Error) => { - reject(error); - }) - .end(body); - } + + return buildResult(resultStream); + })() + ); + } else { + resolve(buildResult(response)); + } + }, + makeStreamRequestAsync ); }; +// Module-level mutable state for mock injection. These must NOT be private members +// of WebClient because rush-sdk re-exports WebClient as a separate type declaration, +// and TypeScript's structural typing treats private members nominally, causing type +// incompatibility between the rush-lib and rush-sdk versions. +let _requestFnAsync: FetchFn = makeRequestAsync; +let _streamRequestFnAsync: StreamFetchFn = makeStreamRequestAsync; + +function _mergeHeaders(target: Record, source: Record): void { + for (const [name, value] of Object.entries(source)) { + target[name] = value; + } +} + +/** + * Builds the low-level IRequestOptions from WebClient instance state and caller-provided options. + * This is a module-level function (not a private method) to avoid the rush-sdk type mismatch. + */ +function buildRequestOptions( + webClient: WebClient, + options?: IGetFetchOptions | IFetchOptionsWithBody +): IRequestOptions { + const { + headers: optionsHeaders, + timeoutMs = 15 * 1000, + verb, + redirect, + body, + noDecode + } = (options as IFetchOptionsWithBody | undefined) ?? {}; + + const headers: Record = {}; + + const { standardHeaders, userAgent, accept, proxy } = webClient; + + _mergeHeaders(headers, standardHeaders); + + if (optionsHeaders) { + _mergeHeaders(headers, optionsHeaders); + } + + if (userAgent) { + headers[USER_AGENT_HEADER_NAME] = userAgent; + } + + if (accept) { + headers[ACCEPT_HEADER_NAME] = accept; + } + + let proxyUrl: string = ''; + + switch (proxy) { + case WebClientProxy.Detect: + if (process.env.HTTPS_PROXY) { + proxyUrl = process.env.HTTPS_PROXY; + } else if (process.env.HTTP_PROXY) { + proxyUrl = process.env.HTTP_PROXY; + } + break; + + case WebClientProxy.Fiddler: + // For debugging, disable cert validation + // eslint-disable-next-line + process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'; + proxyUrl = 'http://localhost:8888/'; + break; + } + + let agent: HttpAgent | undefined = undefined; + if (proxyUrl) { + agent = createHttpsProxyAgent(proxyUrl); + } + + return { + method: verb, + headers, + agent, + timeout: timeoutMs, + redirect, + body, + noDecode + }; +} + /** * A helper for issuing HTTP requests. */ export class WebClient { - private static _requestFn: FetchFn = makeRequestAsync; - public readonly standardHeaders: Record = {}; public accept: string | undefined = '*/*'; @@ -220,17 +447,23 @@ export class WebClient { public proxy: WebClientProxy = WebClientProxy.Detect; public static mockRequestFn(fn: FetchFn): void { - WebClient._requestFn = fn; + _requestFnAsync = fn; } public static resetMockRequestFn(): void { - WebClient._requestFn = makeRequestAsync; + _requestFnAsync = makeRequestAsync; + } + + public static mockStreamRequestFn(fn: StreamFetchFn): void { + _streamRequestFnAsync = fn; + } + + public static resetMockStreamRequestFn(): void { + _streamRequestFnAsync = makeStreamRequestAsync; } public static mergeHeaders(target: Record, source: Record): void { - for (const [name, value] of Object.entries(source)) { - target[name] = value; - } + _mergeHeaders(target, source); } public addBasicAuthHeader(userName: string, password: string): void { @@ -242,65 +475,19 @@ export class WebClient { url: string, options?: IGetFetchOptions | IFetchOptionsWithBody ): Promise { - const { - headers: optionsHeaders, - timeoutMs = 15 * 1000, - verb, - redirect, - body, - noDecode - } = (options as IFetchOptionsWithBody | undefined) ?? {}; - - const headers: Record = {}; - - WebClient.mergeHeaders(headers, this.standardHeaders); - - if (optionsHeaders) { - WebClient.mergeHeaders(headers, optionsHeaders); - } - - if (this.userAgent) { - headers[USER_AGENT_HEADER_NAME] = this.userAgent; - } - - if (this.accept) { - headers[ACCEPT_HEADER_NAME] = this.accept; - } - - let proxyUrl: string = ''; - - switch (this.proxy) { - case WebClientProxy.Detect: - if (process.env.HTTPS_PROXY) { - proxyUrl = process.env.HTTPS_PROXY; - } else if (process.env.HTTP_PROXY) { - proxyUrl = process.env.HTTP_PROXY; - } - break; - - case WebClientProxy.Fiddler: - // For debugging, disable cert validation - // eslint-disable-next-line - process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'; - proxyUrl = 'http://localhost:8888/'; - break; - } - - let agent: HttpAgent | undefined = undefined; - if (proxyUrl) { - agent = createHttpsProxyAgent(proxyUrl); - } + const requestInit: IRequestOptions = buildRequestOptions(this, options); + return await _requestFnAsync(url, requestInit); + } - const requestInit: IRequestOptions = { - method: verb, - headers, - agent, - timeout: timeoutMs, - redirect, - body, - noDecode - }; - - return await WebClient._requestFn(url, requestInit); + /** + * Makes an HTTP request that resolves as soon as headers are received, providing the + * response body as a readable stream. This avoids buffering the entire response in memory. + */ + public async fetchStreamAsync( + url: string, + options?: IGetFetchOptions | IFetchOptionsWithBody + ): Promise { + const requestInit: IRequestOptions = buildRequestOptions(this, options); + return await _streamRequestFnAsync(url, requestInit); } } diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts index ea8d7b903c0..1410294476b 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts @@ -153,7 +153,7 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { ): Promise { try { const client: AmazonS3Client = await this._getS3ClientAsync(terminal); - return await client.getObjectAsync(this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId); + return await client.getObjectAsync(this._getObjectName(cacheId)); } catch (e) { terminal.writeWarningLine(`Error getting cache entry from S3: ${e}`); return undefined; @@ -165,16 +165,46 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { cacheId: string, objectBuffer: Buffer ): Promise { - if (!this.isCacheWriteAllowed) { - terminal.writeErrorLine('Writing to S3 cache is not allowed in the current configuration.'); + if (!this._validateWriteAllowed(terminal, cacheId)) { return false; } - terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + try { + const client: AmazonS3Client = await this._getS3ClientAsync(terminal); + await client.uploadObjectAsync(this._getObjectName(cacheId), objectBuffer); + return true; + } catch (e) { + terminal.writeWarningLine(`Error uploading cache entry to S3: ${e}`); + return false; + } + } + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { try { const client: AmazonS3Client = await this._getS3ClientAsync(terminal); - await client.uploadObjectAsync(this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId, objectBuffer); + return await client.downloadObjectToFileAsync(this._getObjectName(cacheId), localFilePath); + } catch (e) { + terminal.writeWarningLine(`Error downloading cache entry from S3: ${e}`); + return false; + } + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + if (!this._validateWriteAllowed(terminal, cacheId)) { + return false; + } + + try { + const client: AmazonS3Client = await this._getS3ClientAsync(terminal); + await client.uploadObjectFromFileAsync(this._getObjectName(cacheId), localFilePath); return true; } catch (e) { terminal.writeWarningLine(`Error uploading cache entry to S3: ${e}`); @@ -182,6 +212,20 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { } } + private _getObjectName(cacheId: string): string { + return this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId; + } + + private _validateWriteAllowed(terminal: ITerminal, cacheId: string): boolean { + if (!this.isCacheWriteAllowed) { + terminal.writeErrorLine('Writing to S3 cache is not allowed in the current configuration.'); + return false; + } + + terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + return true; + } + public async updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise { await CredentialCache.usingAsync( { diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts index 890aeba8593..6ad20a823ec 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts @@ -2,13 +2,22 @@ // See LICENSE in the project root for license information. import * as crypto from 'node:crypto'; +import type { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; -import { Async } from '@rushstack/node-core-library'; +import { + Async, + FileSystem, + type FileSystemReadStream, + type FileSystemWriteStream +} from '@rushstack/node-core-library'; import { Colorize, type ITerminal } from '@rushstack/terminal'; import { type IGetFetchOptions, type IFetchOptionsWithBody, type IWebClientResponse, + type IWebClientResponseBase, + type IWebClientStreamResponse, type WebClient, AUTHORIZATION_HEADER_NAME } from '@rushstack/rush-sdk/lib/utilities/WebClient'; @@ -16,7 +25,8 @@ import { import type { IAmazonS3BuildCacheProviderOptionsAdvanced } from './AmazonS3BuildCacheProvider'; import { type IAmazonS3Credentials, fromRushEnv } from './AmazonS3Credentials'; -const CONTENT_HASH_HEADER_NAME: 'x-amz-content-sha256' = 'x-amz-content-sha256'; +const HASH_ALGORITHM: 'sha256' = 'sha256'; +const CONTENT_HASH_HEADER_NAME: `x-amz-content-${typeof HASH_ALGORITHM}` = `x-amz-content-${HASH_ALGORITHM}`; const DATE_HEADER_NAME: 'x-amz-date' = 'x-amz-date'; const HOST_HEADER_NAME: 'host' = 'host'; const SECURITY_TOKEN_HEADER_NAME: 'x-amz-security-token' = 'x-amz-security-token'; @@ -60,6 +70,18 @@ const storageRetryOptions: IStorageRetryOptions = { retryPolicyType: StorageRetryPolicyType.EXPONENTIAL }; +/** + * Computes the SHA-256 hash of a file on disk using streaming reads. + */ +async function _hashFileAsync(filePath: string): Promise { + return await new Promise((resolve, reject) => { + const hash: crypto.Hash = crypto.createHash(HASH_ALGORITHM); + const stream: FileSystemReadStream = FileSystem.createReadStream(filePath); + stream.on('data', (chunk: string | Buffer) => hash.update(chunk)); + stream.on('end', () => resolve(hash.digest('hex'))); + stream.on('error', reject); + }); +} /** * A helper for reading and updating objects on Amazon S3 * @@ -119,42 +141,8 @@ export class AmazonS3Client { public async getObjectAsync(objectName: string): Promise { this._writeDebugLine('Reading object from S3'); return await this._sendCacheRequestWithRetriesAsync(async () => { - const response: IWebClientResponse = await this._makeRequestAsync('GET', objectName); - if (response.ok) { - return { - hasNetworkError: false, - response: await response.getBufferAsync() - }; - } else if (response.status === 404) { - return { - hasNetworkError: false, - response: undefined - }; - } else if ( - (response.status === 400 || response.status === 401 || response.status === 403) && - !this._credentials - ) { - // unauthorized due to not providing credentials, - // silence error for better DX when e.g. running locally without credentials - this._writeWarningLine( - `No credentials found and received a ${response.status}`, - ' response code from the cloud storage.', - ' Maybe run rush update-cloud-credentials', - ' or set the RUSH_BUILD_CACHE_CREDENTIAL env' - ); - return { - hasNetworkError: false, - response: undefined - }; - } else if (response.status === 400 || response.status === 401 || response.status === 403) { - throw await this._getS3ErrorAsync(response); - } else { - const error: Error = await this._getS3ErrorAsync(response); - return { - hasNetworkError: true, - error - }; - } + const response: IWebClientResponse = await this._makeSignedRequestAsync('GET', objectName); + return this._handleGetResponseAsync(response, async () => await response.getBufferAsync()); }); } @@ -164,7 +152,11 @@ export class AmazonS3Client { } await this._sendCacheRequestWithRetriesAsync(async () => { - const response: IWebClientResponse = await this._makeRequestAsync('PUT', objectName, objectBuffer); + const response: IWebClientResponse = await this._makeSignedRequestAsync( + 'PUT', + objectName, + objectBuffer + ); if (!response.ok) { return { hasNetworkError: true, @@ -178,6 +170,66 @@ export class AmazonS3Client { }); } + /** + * Downloads an S3 object directly to a local file path, using streaming to avoid + * buffering the entire object in memory. Retries on transient network errors. + * + * @returns `true` if the object was found and written to the file, `false` if not found. + */ + public async downloadObjectToFileAsync(objectName: string, localFilePath: string): Promise { + this._writeDebugLine('Downloading object from S3 to file'); + const result: boolean | undefined = await this._sendCacheRequestWithRetriesAsync(async () => { + const response: IWebClientStreamResponse = await this._makeSignedRequestAsync( + 'GET', + objectName, + undefined, + true + ); + return this._handleGetResponseAsync( + response, + async () => { + const writeStream: FileSystemWriteStream = await FileSystem.createWriteStreamAsync(localFilePath, { + ensureFolderExists: true + }); + await pipeline(response.stream, writeStream); + return true; + }, + () => response.stream.resume() + ); + }); + + return result ?? false; + } + + /** + * Uploads a local file to S3 using streaming, with the file's SHA-256 hash included in + * the AWS Signature V4 request for payload integrity verification. Does not retry + * because the stream is consumed after the first attempt. + */ + public async uploadObjectFromFileAsync(objectName: string, localFilePath: string): Promise { + if (!this._credentials) { + throw new Error('Credentials are required to upload objects to S3.'); + } + + // Compute SHA-256 hash of the file before uploading so we can sign the payload + const contentHash: string = await _hashFileAsync(localFilePath); + const entryStream: FileSystemReadStream = FileSystem.createReadStream(localFilePath); + + // Streaming uploads cannot be retried because the stream is consumed after the first attempt. + const response: IWebClientStreamResponse = await this._makeSignedRequestAsync( + 'PUT', + objectName, + entryStream as Readable, + true, + contentHash + ); + if (!response.ok) { + response.stream.resume(); + throw new Error(`Amazon S3 responded with status code ${response.status} (${response.statusText})`); + } + response.stream.resume(); + } + private _writeDebugLine(...messageParts: string[]): void { // if the terminal has been closed then don't bother sending a debug message try { @@ -196,13 +248,102 @@ export class AmazonS3Client { } } - private async _makeRequestAsync( + /** + * Shared response handling for GET requests (both buffer and stream). + * The `getSuccessResult` callback extracts the response payload (Buffer or stream-to-file result). + * The optional `cleanup` callback drains stream responses on non-success paths. + */ + private async _handleGetResponseAsync( + response: IWebClientResponseBase, + getSuccessResult: () => T | Promise, + cleanup?: () => void + ): Promise> { + const { ok, status, statusText } = response; + if (ok) { + return { + hasNetworkError: false, + response: await getSuccessResult() + }; + } else if (status === 404) { + cleanup?.(); + return { + hasNetworkError: false, + response: undefined + }; + } else if ((status === 400 || status === 401 || status === 403) && !this._credentials) { + cleanup?.(); + // unauthorized due to not providing credentials, + // silence error for better DX when e.g. running locally without credentials + this._writeWarningLine( + `No credentials found and received a ${status}`, + ' response code from the cloud storage.', + ' Maybe run rush update-cloud-credentials', + ' or set the RUSH_BUILD_CACHE_CREDENTIAL env' + ); + return { + hasNetworkError: false, + response: undefined + }; + } else if (status === 400 || status === 401 || status === 403) { + cleanup?.(); + throw new Error(`Amazon S3 responded with status code ${status} (${statusText})`); + } else { + cleanup?.(); + return { + hasNetworkError: true, + error: new Error(`Amazon S3 responded with status code ${status} (${statusText})`) + }; + } + } + + private async _makeSignedRequestAsync( verb: 'GET' | 'PUT', objectName: string, body?: Buffer - ): Promise { + ): Promise; + private async _makeSignedRequestAsync( + verb: 'GET' | 'PUT', + objectName: string, + body: Readable | undefined, + stream: true, + contentHash?: string + ): Promise; + private async _makeSignedRequestAsync( + verb: 'GET' | 'PUT', + objectName: string, + body?: Buffer | Readable, + stream?: boolean, + contentHash?: string + ): Promise { + // Use the provided content hash if available (e.g. pre-computed from a file on disk), + // otherwise compute from the buffer body, or use the empty hash for GET requests. + const bodyHash: string = contentHash ?? this._getBufferSha256(Buffer.isBuffer(body) ? body : undefined); + const { url, headers } = this._buildSignedRequest(verb, objectName, bodyHash); + + const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { + verb, + headers + }; + if (verb === 'PUT' && body) { + (webFetchOptions as IFetchOptionsWithBody).body = body; + } + + if (stream) { + return await this._webClient.fetchStreamAsync(url, webFetchOptions); + } else { + return await this._webClient.fetchAsync(url, webFetchOptions); + } + } + + /** + * Builds an AWS Signature V4 signed request, returning the URL and signed headers. + */ + private _buildSignedRequest( + verb: 'GET' | 'PUT', + objectName: string, + bodyHash: string + ): { url: string; headers: Record } { const isoDateString: IIsoDateString = this._getIsoDateString(); - const bodyHash: string = this._getSha256(body); const headers: Record = {}; headers[DATE_HEADER_NAME] = isoDateString.dateTime; headers[CONTENT_HASH_HEADER_NAME] = bodyHash; @@ -266,7 +407,7 @@ export class AmazonS3Client { signedHeaderNamesString, bodyHash ].join('\n'); - const canonicalRequestHash: string = this._getSha256(canonicalRequest); + const canonicalRequestHash: string = this._getBufferSha256(canonicalRequest); const scope: string = `${isoDateString.date}/${this._s3Region}/s3/aws4_request`; // The string to sign looks like this: @@ -299,14 +440,6 @@ export class AmazonS3Client { } } - const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { - verb, - headers - }; - if (verb === 'PUT') { - (webFetchOptions as IFetchOptionsWithBody).body = body; - } - const url: string = `${this._s3Endpoint}${canonicalUri}`; this._writeDebugLine(Colorize.bold(Colorize.underline('Sending request to S3'))); @@ -316,9 +449,7 @@ export class AmazonS3Client { this._writeDebugLine(Colorize.cyan(`\t${name}: ${value}`)); } - const response: IWebClientResponse = await this._webClient.fetchAsync(url, webFetchOptions); - - return response; + return { url, headers }; } public _getSha256Hmac(key: string | Buffer, data: string): Buffer; @@ -333,9 +464,9 @@ export class AmazonS3Client { } } - private _getSha256(data?: string | Buffer): string { + private _getBufferSha256(data?: string | Buffer): string { if (data) { - const hash: crypto.Hash = crypto.createHash('sha256'); + const hash: crypto.Hash = crypto.createHash(HASH_ALGORITHM); hash.update(data); return hash.digest('hex'); } else { @@ -452,7 +583,7 @@ export class AmazonS3Client { } delay = Math.min(maxRetryDelayInMs, delay); - log(`Will retry request in ${delay}s...`); + log(`Will retry request in ${delay}ms...`); await Async.sleepAsync(delay); const retryResponse: RetryableRequestResponse = await sendRequest(); diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts index 07b5eddfc73..095d3284846 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts @@ -5,7 +5,15 @@ jest.mock('@rushstack/rush-sdk/lib/utilities/WebClient', () => { return jest.requireActual('@microsoft/rush-lib/lib/utilities/WebClient'); }); +jest.mock('node:stream/promises', () => ({ + pipeline: jest.fn().mockResolvedValue(undefined) +})); + +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { ConsoleTerminalProvider, Terminal } from '@rushstack/terminal'; +import { FileSystem } from '@rushstack/node-core-library'; import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; import type { IAmazonS3BuildCacheProviderOptionsAdvanced } from '../AmazonS3BuildCacheProvider'; @@ -634,4 +642,239 @@ describe(AmazonS3Client.name, () => { ); }); }); + + describe('File-based requests', () => { + let realDate: typeof Date; + let realSetTimeout: typeof setTimeout; + beforeEach(() => { + // mock date + realDate = global.Date; + global.Date = MockedDate as typeof Date; + + // mock setTimeout + realSetTimeout = global.setTimeout; + global.setTimeout = ((callback: () => void, time: number) => { + return realSetTimeout(callback, 1); + }).bind(global) as typeof global.setTimeout; + + jest.spyOn(FileSystem, 'ensureFolderAsync').mockResolvedValue(); + jest + .spyOn(FileSystem, 'createWriteStreamAsync') + .mockResolvedValue({} as unknown as Awaited>); + // Return a Readable that immediately ends, so _hashFileAsync completes with the null hash + jest.spyOn(FileSystem, 'createReadStream').mockReturnValue( + new Readable({ + read() { + this.push(null); + } + }) as unknown as ReturnType + ); + }); + + afterEach(() => { + jest.restoreAllMocks(); + global.Date = realDate; + global.setTimeout = realSetTimeout.bind(global); + }); + + describe('Downloading an object to file', () => { + async function makeFileGetRequestAsync( + credentials: IAmazonS3Credentials | undefined, + options: IAmazonS3BuildCacheProviderOptionsAdvanced, + objectName: string, + status: number, + statusText?: string + ): Promise<{ result: boolean; spy: jest.SpyInstance }> { + const mockStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: mockStream, + headers: {}, + status, + statusText, + ok: status >= 200 && status < 300, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client(credentials, options, webClient, terminal); + const result = await s3Client.downloadObjectToFileAsync(objectName, '/tmp/cache-entry'); + return { result, spy }; + } + + it('Can download an object to file', async () => { + const { result, spy } = await makeFileGetRequestAsync( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + 'abc123', + 200 + ); + expect(result).toBe(true); + expect(spy).toHaveBeenCalledTimes(1); + const [url, options] = spy.mock.calls[0]; + expect(url).toBe('http://localhost:9000/abc123'); + expect(options.verb).toBe('GET'); + expect(options.headers['x-amz-content-sha256']).toMatch(/^[0-9a-f]{64}$/); + expect(options.headers['x-amz-date']).toBe('20200418T123242Z'); + // eslint-disable-next-line dot-notation + expect(options.headers['Authorization']).toContain('AWS4-HMAC-SHA256'); + expect(pipeline).toHaveBeenCalled(); + spy.mockRestore(); + }); + + it('Returns false for a 404 (missing) object', async () => { + const { result, spy } = await makeFileGetRequestAsync( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + 'abc123', + 404, + 'Not Found' + ); + expect(result).toBe(false); + expect(spy).toHaveBeenCalledTimes(1); + expect(pipeline).not.toHaveBeenCalled(); + spy.mockRestore(); + }); + + it('Retries on transient server errors', async () => { + let callCount: number = 0; + const spy: jest.SpyInstance = jest + .spyOn(WebClient.prototype, 'fetchStreamAsync') + .mockImplementation(async () => { + callCount++; + const mockStream = new Readable({ read() {} }); + if (callCount < 3) { + return { + stream: mockStream, + headers: {}, + status: 500, + statusText: 'InternalServerError', + ok: false, + redirected: false + }; + } + return { + stream: mockStream, + headers: {}, + status: 200, + statusText: 'OK', + ok: true, + redirected: false + }; + }); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + const result = await s3Client.downloadObjectToFileAsync('abc123', '/tmp/cache-entry'); + expect(result).toBe(true); + // First two attempts fail with 500, third succeeds + expect(spy).toHaveBeenCalledTimes(3); + spy.mockRestore(); + }); + }); + + describe('Uploading an object from file', () => { + it('Throws an error if credentials are not provided', async () => { + const s3Client: AmazonS3Client = new AmazonS3Client( + undefined, + { s3Endpoint: 'http://foo.bar.baz', ...DUMMY_OPTIONS_WITHOUT_ENDPOINT }, + webClient, + terminal + ); + + await expect(s3Client.uploadObjectFromFileAsync('temp', '/tmp/cache-entry')).rejects.toThrow( + 'Credentials are required to upload objects to S3.' + ); + }); + + it('Uploads from file with signed payload hash', async () => { + const responseStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: responseStream, + headers: {}, + status: 200, + statusText: 'OK', + ok: true, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + await s3Client.uploadObjectFromFileAsync('abc123', '/tmp/cache-entry'); + + expect(spy).toHaveBeenCalledTimes(1); + const [url, options] = spy.mock.calls[0]; + expect(url).toBe('http://localhost:9000/abc123'); + expect(options.verb).toBe('PUT'); + // Verify the content hash is a real SHA-256 hex string, NOT UNSIGNED-PAYLOAD + expect(options.headers['x-amz-content-sha256']).toMatch(/^[0-9a-f]{64}$/); + expect(options.headers['x-amz-date']).toBe('20200418T123242Z'); + // eslint-disable-next-line dot-notation + expect(options.headers['Authorization']).toContain('AWS4-HMAC-SHA256'); + spy.mockRestore(); + }); + + it('Does not retry on failure (stream consumed)', async () => { + const responseStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: responseStream, + headers: {}, + status: 500, + statusText: 'InternalServerError', + ok: false, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + await expect(s3Client.uploadObjectFromFileAsync('abc123', '/tmp/cache-entry')).rejects.toThrow('500'); + + // Only 1 call - no retry for file-based uploads + expect(spy).toHaveBeenCalledTimes(1); + spy.mockRestore(); + }); + }); + }); }); diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap index f519e7b7a61..0d162539a10 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap @@ -548,3 +548,4 @@ exports[`AmazonS3Client Rejects invalid S3 endpoint values 9`] = `"Invalid S3 en exports[`AmazonS3Client Rejects invalid S3 endpoint values 10`] = `"Invalid S3 endpoint. Some part of the hostname contains invalid characters or is too long"`; exports[`AmazonS3Client Rejects invalid S3 endpoint values 11`] = `"Invalid S3 endpoint. Some part of the hostname contains invalid characters or is too long"`; + diff --git a/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts b/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts index 146ee2e8f00..cfc91b25ffa 100644 --- a/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts +++ b/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. +import * as path from 'node:path'; + import { type BlobClient, BlobServiceClient, @@ -9,6 +11,7 @@ import { } from '@azure/storage-blob'; import { AzureAuthorityHosts } from '@azure/identity'; +import { FileSystem } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { type ICloudBuildCacheProvider, @@ -75,66 +78,83 @@ export class AzureStorageBuildCacheProvider terminal: ITerminal, cacheId: string ): Promise { + return await this._tryGetBlobDataAsync(terminal, cacheId, async (blobClient: BlobClient) => { + return await blobClient.downloadToBuffer(); + }); + } + + public async trySetCacheEntryBufferAsync( + terminal: ITerminal, + cacheId: string, + entryBuffer: Buffer + ): Promise { + return await this._trySetBlobDataAsync(terminal, cacheId, async (blockBlobClient: BlockBlobClient) => { + await blockBlobClient.upload(entryBuffer, entryBuffer.length); + }); + } + + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + const result: boolean | undefined = await this._tryGetBlobDataAsync( + terminal, + cacheId, + async (blobClient: BlobClient) => { + // TODO: Determine if this is necessary, or if the Azure Storage SDK handles this internally. + await FileSystem.ensureFolderAsync(path.dirname(localFilePath)); + await blobClient.downloadToFile(localFilePath); + return true; + } + ); + + return result ?? false; + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + return await this._trySetBlobDataAsync(terminal, cacheId, async (blockBlobClient: BlockBlobClient) => { + await blockBlobClient.uploadFile(localFilePath); + }); + } + + /** + * Shared logic for both buffer-based and file-based GET operations. + * Checks if the blob exists, retrieves data via the provided callback, and handles errors. + */ + private async _tryGetBlobDataAsync( + terminal: ITerminal, + cacheId: string, + getBlobDataAsync: (blobClient: BlobClient) => Promise + ): Promise { const blobClient: BlobClient = await this._getBlobClientForCacheIdAsync(cacheId, terminal); try { const blobExists: boolean = await blobClient.exists(); if (blobExists) { - return await blobClient.downloadToBuffer(); + return await getBlobDataAsync(blobClient); } else { return undefined; } } catch (err) { - const e: IBlobError = err as IBlobError; - const errorMessage: string = - 'Error getting cache entry from Azure Storage: ' + - [e.name, e.message, e.response?.status, e.response?.parsedHeaders?.errorCode] - .filter((piece: string | undefined) => piece) - .join(' '); - - if (e.response?.parsedHeaders?.errorCode === 'PublicAccessNotPermitted') { - // This error means we tried to read the cache with no credentials, but credentials are required. - // We'll assume that the configuration of the cache is correct and the user has to take action. - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `You need to configure Azure Storage SAS credentials to access the build cache.\n` + - `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + - `or provide a SAS in the ` + - `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` - ); - } else if (e.response?.parsedHeaders?.errorCode === 'AuthenticationFailed') { - // This error means the user's credentials are incorrect, but not expired normally. They might have - // gotten corrupted somehow, or revoked manually in Azure Portal. - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `Your Azure Storage SAS credentials are not valid.\n` + - `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + - `or provide a SAS in the ` + - `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` - ); - } else if (e.response?.parsedHeaders?.errorCode === 'AuthorizationPermissionMismatch') { - // This error is not solvable by the user, so we'll assume it is a configuration error, and revert - // to providing likely next steps on configuration. (Hopefully this error is rare for a regular - // developer, more likely this error will appear while someone is configuring the cache for the - // first time.) - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `Your Azure Storage SAS credentials are valid, but do not have permission to read the build cache.\n` + - `Make sure you have added the role 'Storage Blob Data Reader' to the appropriate user(s) or group(s)\n` + - `on your storage account in the Azure Portal.` - ); - } else { - // We don't know what went wrong, hopefully we'll print something useful. - terminal.writeWarningLine(errorMessage); - } + this._logBlobError(terminal, err, 'Error getting cache entry from Azure Storage: '); return undefined; } } - public async trySetCacheEntryBufferAsync( + /** + * Shared logic for both buffer-based and file-based SET operations. + * Checks write permission, whether the blob already exists, uploads via the provided callback, + * and handles 409 conflict errors. + */ + private async _trySetBlobDataAsync( terminal: ITerminal, cacheId: string, - entryStream: Buffer + uploadAsync: (blockBlobClient: BlockBlobClient) => Promise ): Promise { if (!this.isCacheWriteAllowed) { terminal.writeErrorLine( @@ -170,7 +190,7 @@ export class AzureStorageBuildCacheProvider return true; } else { try { - await blockBlobClient.upload(entryStream, entryStream.length); + await uploadAsync(blockBlobClient); return true; } catch (e) { if ((e as IBlobError).statusCode === 409 /* conflict */) { @@ -196,6 +216,42 @@ export class AzureStorageBuildCacheProvider return client.getBlobClient(blobName); } + private _logBlobError(terminal: ITerminal, err: unknown, prefix: string): void { + const e: IBlobError = err as IBlobError; + const errorMessage: string = + prefix + + [e.name, e.message, e.response?.status, e.response?.parsedHeaders?.errorCode] + .filter((piece: string | undefined) => piece) + .join(' '); + + if (e.response?.parsedHeaders?.errorCode === 'PublicAccessNotPermitted') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `You need to configure Azure Storage SAS credentials to access the build cache.\n` + + `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + + `or provide a SAS in the ` + + `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` + ); + } else if (e.response?.parsedHeaders?.errorCode === 'AuthenticationFailed') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `Your Azure Storage SAS credentials are not valid.\n` + + `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + + `or provide a SAS in the ` + + `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` + ); + } else if (e.response?.parsedHeaders?.errorCode === 'AuthorizationPermissionMismatch') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `Your Azure Storage SAS credentials are valid, but do not have permission to read the build cache.\n` + + `Make sure you have added the role 'Storage Blob Data Reader' to the appropriate user(s) or group(s)\n` + + `on your storage account in the Azure Portal.` + ); + } else { + terminal.writeWarningLine(errorMessage); + } + } + private async _getContainerClientAsync(terminal: ITerminal): Promise { if (!this._containerClient) { let sasString: string | undefined = this._environmentCredential; diff --git a/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts b/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts index f0cd91e4d52..edfcb890b97 100644 --- a/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts +++ b/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts @@ -86,7 +86,10 @@ export class BridgeCachePlugin implements IRushPlugin { buildCacheConfiguration, rushConfiguration: { experimentsConfiguration: { - configuration: { omitAppleDoubleFilesFromBuildCache } + configuration: { + omitAppleDoubleFilesFromBuildCache: excludeAppleDoubleFiles = false, + useDirectFileTransfersForBuildCache = false + } } } } = context; @@ -119,7 +122,8 @@ export class BridgeCachePlugin implements IRushPlugin { { buildCacheConfiguration, terminal, - excludeAppleDoubleFiles: !!omitAppleDoubleFilesFromBuildCache + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } ); diff --git a/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts b/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts index 0ff34473e7e..5a0dc3f29ae 100644 --- a/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts +++ b/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts @@ -2,16 +2,30 @@ // See LICENSE in the project root for license information. import type { SpawnSyncReturns } from 'node:child_process'; +import type { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; import { type ICredentialCacheEntry, CredentialCache } from '@rushstack/credential-cache'; -import { Executable, Async } from '@rushstack/node-core-library'; +import { + Executable, + Async, + FileSystem, + type FileSystemWriteStream, + type FileSystemReadStream +} from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { type ICloudBuildCacheProvider, type RushSession, EnvironmentConfiguration } from '@rushstack/rush-sdk'; -import { WebClient, type IWebClientResponse } from '@rushstack/rush-sdk/lib/utilities/WebClient'; +import { + WebClient, + type IGetFetchOptions, + type IFetchOptionsWithBody, + type IWebClientResponse, + type IWebClientStreamResponse +} from '@rushstack/rush-sdk/lib/utilities/WebClient'; enum CredentialsOptions { Optional, @@ -66,6 +80,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private readonly _cacheKeyPrefix: string; private readonly _tokenHandler: IHttpBuildCacheTokenHandler | undefined; private readonly _minHttpRetryDelayMs: number; + private readonly _webClient: WebClient = new WebClient(); private __credentialCacheId: string | undefined; public get isCacheWriteAllowed(): boolean { @@ -104,7 +119,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { ): Promise { try { const result: boolean | Buffer = await this._makeHttpRequestAsync({ - terminal: terminal, + terminal, relUrl: `${this._cacheKeyPrefix}${cacheId}`, method: 'GET', body: undefined, @@ -125,16 +140,13 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { cacheId: string, objectBuffer: Buffer ): Promise { - if (!this.isCacheWriteAllowed) { - terminal.writeErrorLine('Writing to cache is not allowed in the current configuration.'); + if (!this._validateWriteAllowed(terminal, cacheId)) { return false; } - terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); - try { const result: boolean | Buffer = await this._makeHttpRequestAsync({ - terminal: terminal, + terminal, relUrl: `${this._cacheKeyPrefix}${cacheId}`, method: this._uploadMethod, body: objectBuffer, @@ -150,6 +162,73 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { } } + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + try { + const result: IWebClientStreamResponse | false = await this._makeHttpStreamRequestAsync({ + terminal, + relUrl: `${this._cacheKeyPrefix}${cacheId}`, + method: 'GET', + body: undefined, + warningText: 'Could not get cache entry', + maxAttempts: MAX_HTTP_CACHE_ATTEMPTS + }); + + if (result === false) { + return false; + } + + const writeStream: FileSystemWriteStream = await FileSystem.createWriteStreamAsync(localFilePath, { + ensureFolderExists: true + }); + await pipeline(result.stream, writeStream); + return true; + } catch (e) { + terminal.writeWarningLine(`Error getting cache entry: ${e}`); + return false; + } + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + if (!this._validateWriteAllowed(terminal, cacheId)) { + return false; + } + + try { + const entryStream: FileSystemReadStream = FileSystem.createReadStream(localFilePath); + const result: IWebClientStreamResponse | false = await this._makeHttpStreamRequestAsync({ + terminal, + relUrl: `${this._cacheKeyPrefix}${cacheId}`, + method: this._uploadMethod, + body: entryStream, + warningText: 'Could not write cache entry', + // maxAttempts is 1 because the file read stream is consumed after the first attempt + // and cannot be replayed. Downloads use MAX_HTTP_CACHE_ATTEMPTS since each retry + // issues a fresh GET with no request body. + maxAttempts: 1, + // Pre-resolve credentials for stream uploads because the credential fallback path + // in _makeHttpCoreRequestAsync cannot replay a consumed stream body. + credentialOptions: CredentialsOptions.Required + }); + + if (result !== false) { + // Drain the response body + result.stream.resume(); + } + return result !== false; + } catch (e) { + terminal.writeWarningLine(`Error uploading cache entry: ${e}`); + return false; + } + } + public async updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise { await CredentialCache.usingAsync( { @@ -157,7 +236,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { }, async (credentialsCache: CredentialCache) => { credentialsCache.setCacheEntry(this._credentialCacheId, { - credential: credential + credential }); await credentialsCache.saveIfModifiedAsync(); } @@ -224,6 +303,20 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { return this.__credentialCacheId; } + /** + * Common validation for write operations. Returns `true` if writing is allowed, + * `false` if it is not (and logs an error to the terminal). + */ + private _validateWriteAllowed(terminal: ITerminal, cacheId: string): boolean { + if (!this.isCacheWriteAllowed) { + terminal.writeErrorLine('Writing to cache is not allowed in the current configuration.'); + return false; + } + + terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + return true; + } + private async _makeHttpRequestAsync(options: { terminal: ITerminal; relUrl: string; @@ -234,7 +327,68 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { maxAttempts: number; credentialOptions?: CredentialsOptions; }): Promise { - const { terminal, relUrl, method, body, warningText, readBody, credentialOptions } = options; + const { readBody } = options; + // The stream: false flag ensures the response is an IWebClientResponse + const response: IWebClientResponse | false = (await this._makeHttpCoreRequestAsync({ + ...options, + stream: false + })) as IWebClientResponse | false; + + if (response === false) { + return false; + } + + const result: Buffer | boolean = readBody ? await response.getBufferAsync() : true; + options.terminal.writeDebugLine( + `[http-build-cache] actual response: ${response.status} ${new URL(options.relUrl, this._url).href} ${ + result === true ? 'true' : result.length + } bytes` + ); + + return result; + } + + private async _makeHttpStreamRequestAsync(options: { + terminal: ITerminal; + relUrl: string; + method: 'GET' | UploadMethod; + body: Readable | undefined; + warningText: string; + maxAttempts: number; + credentialOptions?: CredentialsOptions; + }): Promise { + // The stream: true flag ensures the response is an IWebClientStreamResponse + const response: IWebClientStreamResponse | false = (await this._makeHttpCoreRequestAsync({ + ...options, + stream: true + })) as IWebClientStreamResponse | false; + + if (response === false) { + return false; + } + + options.terminal.writeDebugLine( + `[http-build-cache] stream response: ${response.status} ${new URL(options.relUrl, this._url).href}` + ); + + return response; + } + + /** + * Shared request core for both buffer-based and streaming HTTP requests. + * Handles credentials resolution, header construction, retry logic, and failure reporting. + */ + private async _makeHttpCoreRequestAsync(options: { + terminal: ITerminal; + relUrl: string; + method: 'GET' | UploadMethod; + body: Buffer | Readable | undefined; + warningText: string; + maxAttempts: number; + credentialOptions?: CredentialsOptions; + stream: boolean; + }): Promise { + const { terminal, relUrl, method, body, warningText, credentialOptions, stream } = options; const safeCredentialOptions: CredentialsOptions = credentialOptions ?? CredentialsOptions.Optional; const credentials: string | undefined = await this._tryGetCredentialsAsync(safeCredentialOptions); const url: string = new URL(relUrl, this._url).href; @@ -250,20 +404,28 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { } } - const bodyLength: number | 'unknown' = (body as { length: number })?.length || 'unknown'; + const bodyLengthDesc: string = Buffer.isBuffer(body) ? `${body.length} bytes` : 'unknown length'; - terminal.writeDebugLine(`[http-build-cache] request: ${method} ${url} ${bodyLength} bytes`); + terminal.writeDebugLine(`[http-build-cache] request: ${method} ${url} ${bodyLengthDesc}`); - const webClient: WebClient = new WebClient(); - const response: IWebClientResponse = await webClient.fetchAsync(url, { + const fetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { verb: method, - headers: headers, - body: body, + headers, + body, redirect: 'follow', - timeoutMs: 0 // Use the default timeout - }); + timeoutMs: 0 // Disable timeout for streaming transfers of large cache entries + }; + + const response: IWebClientResponse | IWebClientStreamResponse = stream + ? await this._webClient.fetchStreamAsync(url, fetchOptions) + : await this._webClient.fetchAsync(url, fetchOptions); if (!response.ok) { + // Drain the response body on stream responses so the connection can be reused + if ('stream' in response) { + response.stream.resume(); + } + const isNonCredentialResponse: boolean = response.status >= 500 && response.status < 600; if ( @@ -271,15 +433,20 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { typeof credentials !== 'string' && safeCredentialOptions === CredentialsOptions.Optional ) { - // If we don't already have credentials yet, and we got a response from the server - // that is a "normal" failure (4xx), then we assume that credentials are probably - // required. Re-attempt the request, requiring credentials this time. - // - // This counts as part of the "first attempt", so it is not included in the max attempts - return await this._makeHttpRequestAsync({ - ...options, - credentialOptions: CredentialsOptions.Required - }); + // Skip credential fallback for stream bodies since the stream has already been consumed + // by the first attempt and cannot be replayed. + const isStreamBody: boolean = !!body && typeof (body as Readable).pipe === 'function'; + if (!isStreamBody) { + // If we don't already have credentials yet, and we got a response from the server + // that is a "normal" failure (4xx), then we assume that credentials are probably + // required. Re-attempt the request, requiring credentials this time. + // + // This counts as part of the "first attempt", so it is not included in the max attempts + return await this._makeHttpCoreRequestAsync({ + ...options, + credentialOptions: CredentialsOptions.Required + }); + } } if (options.maxAttempts > 1) { @@ -291,22 +458,14 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { await Async.sleepAsync(retryDelay); - return await this._makeHttpRequestAsync({ ...options, maxAttempts: options.maxAttempts - 1 }); + return await this._makeHttpCoreRequestAsync({ ...options, maxAttempts: options.maxAttempts - 1 }); } this._reportFailure(terminal, method, response, false, warningText); return false; } - const result: Buffer | boolean = readBody ? await response.getBufferAsync() : true; - - terminal.writeDebugLine( - `[http-build-cache] actual response: ${response.status} ${url} ${ - result === true ? 'true' : result.length - } bytes` - ); - - return result; + return response; } private async _tryGetCredentialsAsync(options: CredentialsOptions.Required): Promise; @@ -363,7 +522,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private _getFailureType( requestMethod: string, - response: IWebClientResponse, + response: IWebClientResponse | IWebClientStreamResponse, isRedirect: boolean ): FailureType { if (response.ok) { @@ -415,7 +574,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private _reportFailure( terminal: ITerminal, requestMethod: string, - response: IWebClientResponse, + response: IWebClientResponse | IWebClientStreamResponse, isRedirect: boolean, message: string ): void { diff --git a/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts b/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts index 822eb9ae11e..6e5476ba910 100644 --- a/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts +++ b/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts @@ -5,7 +5,16 @@ jest.mock('@rushstack/rush-sdk/lib/utilities/WebClient', () => { return jest.requireActual('@microsoft/rush-lib/lib/utilities/WebClient'); }); +jest.mock('node:stream/promises', () => ({ + pipeline: jest.fn().mockResolvedValue(undefined) +})); + +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { type RushSession, EnvironmentConfiguration } from '@rushstack/rush-sdk'; +import { type ICredentialCacheEntry, CredentialCache } from '@rushstack/credential-cache'; +import { FileSystem } from '@rushstack/node-core-library'; import { StringBufferTerminalProvider, Terminal } from '@rushstack/terminal'; import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; @@ -24,24 +33,44 @@ const EXAMPLE_OPTIONS: IHttpBuildCacheProviderOptions = { minHttpRetryDelayMs: 1 }; +const WRITE_ALLOWED_OPTIONS: IHttpBuildCacheProviderOptions = { + ...EXAMPLE_OPTIONS, + isCacheWriteAllowed: true +}; + type FetchFnType = Parameters[0]; +type StreamFetchFnType = Parameters[0]; describe('HttpBuildCacheProvider', () => { let terminalBuffer: StringBufferTerminalProvider; let terminal!: Terminal; let fetchFn: jest.Mock; + let streamFetchFn: jest.Mock; beforeEach(() => { terminalBuffer = new StringBufferTerminalProvider(); terminal = new Terminal(terminalBuffer); fetchFn = jest.fn(); + streamFetchFn = jest.fn(); WebClient.mockRequestFn(fetchFn as unknown as FetchFnType); + WebClient.mockStreamRequestFn(streamFetchFn as unknown as StreamFetchFnType); + jest + .spyOn(FileSystem, 'createReadStream') + .mockReturnValue({ pipe: jest.fn() } as unknown as ReturnType); + jest + .spyOn(FileSystem, 'createWriteStreamAsync') + .mockResolvedValue({} as unknown as Awaited>); + jest.spyOn(FileSystem, 'ensureFolderAsync').mockResolvedValue(); }); afterEach(() => { WebClient.resetMockRequestFn(); + WebClient.resetMockStreamRequestFn(); + jest.restoreAllMocks(); }); + // ── Buffer-based GET ────────────────────────────────────────────────────── + describe('tryGetCacheEntryBufferByIdAsync', () => { it('prints warning if read credentials are not available', async () => { jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); @@ -68,7 +97,7 @@ describe('HttpBuildCacheProvider', () => { ); expect(terminalBuffer.getAllOutputAsChunks({ asLines: true })).toMatchInlineSnapshot(` Array [ - "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", + "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown length[n]", "[warning] Error getting cache entry: Error: Credentials for https://buildcache.example.acme.com/ have not been provided.[n]", "[warning] In CI, verify that RUSH_BUILD_CACHE_CREDENTIAL contains a valid Authorization header value.[n]", "[warning] [n]", @@ -98,7 +127,7 @@ Array [ }); jest.mocked(fetchFn).mockResolvedValueOnce({ status: 504, - statusText: 'BadGateway', + statusText: 'Gateway Timeout', ok: false }); @@ -131,12 +160,326 @@ Array [ ); expect(terminalBuffer.getAllOutputAsChunks({ asLines: true })).toMatchInlineSnapshot(` Array [ - "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", - "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", - "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", - "[warning] Could not get cache entry: HTTP 504: BadGateway[n]", + "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown length[n]", + "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown length[n]", + "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown length[n]", + "[warning] Could not get cache entry: HTTP 504: Gateway Timeout[n]", ] `); }); + + it('returns a buffer on a successful response', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const expectedBuffer = Buffer.from('cache-contents'); + + jest.mocked(fetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + getBufferAsync: () => Promise.resolve(expectedBuffer) + }); + + const result = await provider.tryGetCacheEntryBufferByIdAsync(terminal, 'some-key'); + expect(result).toEqual(expectedBuffer); + expect(fetchFn).toHaveBeenCalledTimes(1); + }); + }); + + // ── Buffer-based SET ────────────────────────────────────────────────────── + + describe('trySetCacheEntryBufferAsync', () => { + it('returns false when cache write is not allowed', async () => { + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); // write not allowed + + const result = await provider.trySetCacheEntryBufferAsync(terminal, 'some-key', Buffer.from('data')); + + expect(result).toBe(false); + expect(fetchFn).not.toHaveBeenCalled(); + }); + + it('uploads a buffer successfully', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + + jest.mocked(fetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {} + }); + + const result = await provider.trySetCacheEntryBufferAsync( + terminal, + 'some-key', + Buffer.from('cache-data') + ); + + expect(result).toBe(true); + expect(fetchFn).toHaveBeenCalledTimes(1); + expect(fetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'POST' + }) + ); + }); + + it('retries up to 3 times on server error', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + + jest.mocked(fetchFn).mockResolvedValue({ + status: 500, + statusText: 'InternalServerError', + ok: false + }); + + const result = await provider.trySetCacheEntryBufferAsync(terminal, 'some-key', Buffer.from('data')); + + expect(result).toBe(false); + expect(fetchFn).toHaveBeenCalledTimes(3); + }); + }); + + // ── File-based GET ────────────────────────────────────────────────────── + + describe('tryDownloadCacheEntryToFileAsync', () => { + it('downloads to file on a successful response', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(true); + expect(streamFetchFn).toHaveBeenCalledTimes(1); + expect(streamFetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'GET', + redirect: 'follow' + }) + ); + expect(pipeline).toHaveBeenCalledWith(mockStream, expect.anything()); + }); + + it('returns false on 404 cache miss', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 404, + statusText: 'Not Found', + ok: false, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + expect(pipeline).not.toHaveBeenCalled(); + }); + + it('returns false on credential failure', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 401, + statusText: 'Unauthorized', + ok: false, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + }); + + it('retries up to 3 times on server error', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const createMockStream = (): Readable => new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValueOnce({ + status: 500, + statusText: 'InternalServiceError', + ok: false, + stream: createMockStream() + }); + jest.mocked(streamFetchFn).mockResolvedValueOnce({ + status: 503, + statusText: 'ServiceUnavailable', + ok: false, + stream: createMockStream() + }); + jest.mocked(streamFetchFn).mockResolvedValueOnce({ + status: 504, + statusText: 'Gateway Timeout', + ok: false, + stream: createMockStream() + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + expect(streamFetchFn).toHaveBeenCalledTimes(3); + }); + }); + + // ── File-based SET ────────────────────────────────────────────────────── + + describe('tryUploadCacheEntryFromFileAsync', () => { + it('returns false when cache write is not allowed', async () => { + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); // write not allowed + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + expect(streamFetchFn).not.toHaveBeenCalled(); + }); + + it('uploads from file successfully', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + stream: responseStream + }); + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(true); + expect(streamFetchFn).toHaveBeenCalledTimes(1); + expect(streamFetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'POST' + }) + ); + }); + + it('does not retry on failure (file stream already consumed)', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 500, + statusText: 'InternalServerError', + ok: false, + stream: responseStream + }); + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + // maxAttempts is 1 for file-based uploads, so only 1 call + expect(streamFetchFn).toHaveBeenCalledTimes(1); + }); + + it('skips credential fallback for file-based uploads on 4xx', async () => { + // No credential in env for the first attempt + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + // But credentials ARE available in the credential cache — without the stream-body + // guard, the credential fallback would resolve these and make a second HTTP request + // with the already-consumed stream body. + jest + .spyOn(CredentialCache, 'usingAsync') + // eslint-disable-next-line @typescript-eslint/naming-convention + .mockImplementation(async (_options, fn) => { + await (fn as (cache: CredentialCache) => Promise)({ + tryGetCacheEntry: (): ICredentialCacheEntry => ({ credential: 'cached-token' }) + } as unknown as CredentialCache); + }); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + jest.mocked(streamFetchFn).mockResolvedValue({ + status: 401, + statusText: 'Unauthorized', + ok: false, + stream: responseStream + }); + + // Even though credentials are optional and we got a 4xx, the stream body + // should prevent the credential fallback retry since the stream is consumed + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + // Should only be called once (no credential fallback retry with consumed stream) + expect(streamFetchFn).toHaveBeenCalledTimes(1); + }); }); });