Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,16 @@ class File extends ServiceObject<File, FileMetadata> {
return pipelineCallback(e);
}

// If this is a partial upload, we don't expect final metadata yet.
if (options.isPartialUpload) {
// Emit CRC32c for this completed chunk if hash validation is active.
if (hashCalculatingStream?.crc32c) {
writeStream.emit('crc32c', hashCalculatingStream.crc32c);
}
// Resolve the pipeline for this *partial chunk*.
return pipelineCallback();
}

// We want to make sure we've received the metadata from the server in order
// to properly validate the object's integrity. Depending on the type of upload,
// the stream could close before the response is returned.
Expand Down
13 changes: 11 additions & 2 deletions src/hash-stream-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,19 @@ class HashStreamValidator extends Transform {
return this.#crc32cHash?.toString();
}

_flush(callback: (error?: Error | null | undefined) => void) {
if (this.#md5Hash) {
/**
* Return the calculated MD5 value, if available.
*/
get md5Digest(): string | undefined {
if (this.#md5Hash && !this.#md5Digest) {
this.#md5Digest = this.#md5Hash.digest('base64');
}
return this.#md5Digest;
}

_flush(callback: (error?: Error | null | undefined) => void) {
// Triggers the getter logic to finalize and cache the MD5 digest
this.md5Digest;

if (this.updateHashesOnly) {
callback();
Expand Down
149 changes: 144 additions & 5 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ import {
getUserAgentString,
} from './util.js';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util.js';
import {FileMetadata} from './file.js';
import {FileExceptionMessages, FileMetadata, RequestError} from './file.js';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import {getPackageJSON} from './package-json-helper.cjs';
import {HashStreamValidator} from './hash-stream-validator.js';

const NOT_FOUND_STATUS_CODE = 404;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
Expand Down Expand Up @@ -149,6 +150,19 @@ export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> {
*/
isPartialUpload?: boolean;

clientCrc32c?: string;
clientMd5Hash?: string;
/**
* Enables CRC32C calculation on the client side.
* The calculated hash will be sent in the final PUT request if `clientCrc32c` is not provided.
*/
crc32c?: boolean;
/**
* Enables MD5 calculation on the client side.
* The calculated hash will be sent in the final PUT request if `clientMd5Hash` is not provided.
*/
md5?: boolean;

/**
* A customer-supplied encryption key. See
* https://cloud.google.com/storage/docs/encryption#customer-supplied.
Expand Down Expand Up @@ -334,6 +348,11 @@ export class Upload extends Writable {
*/
private writeBuffers: Buffer[] = [];
private numChunksReadInRequest = 0;

#hashValidator?: HashStreamValidator;
#clientCrc32c?: string;
#clientMd5Hash?: string;

/**
* An array of buffers used for caching the most recent upload chunk.
* We should not assume that the server received all bytes sent in the request.
Expand Down Expand Up @@ -428,6 +447,20 @@ export class Upload extends Writable {
this.retryOptions = cfg.retryOptions;
this.isPartialUpload = cfg.isPartialUpload ?? false;

this.#clientCrc32c = cfg.clientCrc32c;
this.#clientMd5Hash = cfg.clientMd5Hash;

const calculateCrc32c = !cfg.clientCrc32c && cfg.crc32c;
const calculateMd5 = !cfg.clientMd5Hash && cfg.md5;

if (calculateCrc32c || calculateMd5) {
this.#hashValidator = new HashStreamValidator({
crc32c: calculateCrc32c,
md5: calculateMd5,
updateHashesOnly: true,
});
}

if (cfg.key) {
if (typeof cfg.key === 'string') {
const base64Key = Buffer.from(cfg.key).toString('base64');
Expand Down Expand Up @@ -518,9 +551,19 @@ export class Upload extends Writable {
// Backwards-compatible event
this.emit('writing');

this.writeBuffers.push(
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk
);
const bufferChunk =
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk;

if (this.#hashValidator) {
try {
this.#hashValidator.write(bufferChunk);
} catch (e) {
this.destroy(e as Error);
return;
}
}

this.writeBuffers.push(bufferChunk);

this.once('readFromChunkBuffer', readCallback);

Expand All @@ -537,6 +580,61 @@ export class Upload extends Writable {
this.localWriteCacheByteLength += buf.byteLength;
}

/**
* Compares the client's calculated or provided hash against the server's
* returned hash for a specific checksum type. Destroys the stream on mismatch.
* @param clientHash The client's calculated or provided hash (Base64).
* @param serverHash The hash returned by the server (Base64).
* @param hashType The type of hash ('CRC32C' or 'MD5').
*/
#validateChecksum(
clientHash: string | undefined,
serverHash: string | undefined,
hashType: 'CRC32C' | 'MD5'
): boolean {
// Only validate if both client and server hashes are present.
if (clientHash && serverHash) {
if (clientHash !== serverHash) {
const detailMessage = `${hashType} checksum mismatch. Client calculated: ${clientHash}, Server returned: ${serverHash}`;
const detailError = new Error(detailMessage);
const error = new RequestError(FileExceptionMessages.UPLOAD_MISMATCH);
error.code = 'FILE_NO_UPLOAD';
error.errors = [detailError];

this.destroy(error);
return true;
}
}
return false;
}

/**
* Builds and applies the X-Goog-Hash header to the request options
* using either calculated hashes from #hashValidator or pre-calculated
* client-side hashes. This should only be called on the final request.
*
* @param headers The headers object to modify.
*/
#applyChecksumHeaders(headers: GaxiosOptions['headers']) {
const checksums: string[] = [];

if (this.#hashValidator?.crc32cEnabled) {
checksums.push(`crc32c=${this.#hashValidator.crc32c!}`);
} else if (this.#clientCrc32c) {
checksums.push(`crc32c=${this.#clientCrc32c}`);
}

if (this.#hashValidator?.md5Enabled) {
checksums.push(`md5=${this.#hashValidator.md5Digest!}`);
} else if (this.#clientMd5Hash) {
checksums.push(`md5=${this.#clientMd5Hash}`);
}

if (checksums.length > 0) {
headers!['X-Goog-Hash'] = checksums.join(',');
}
}

/**
* Prepends the local buffer to write buffer and resets it.
*
Expand Down Expand Up @@ -929,6 +1027,10 @@ export class Upload extends Writable {
// unshifting data back into the queue. This way we will know if this is the last request or not.
const isLastChunkOfUpload = !(await this.waitForNextChunk());

if (isLastChunkOfUpload && this.#hashValidator) {
this.#hashValidator.end();
}

// Important: put the data back in the queue for the actual upload
this.prependLocalBufferToUpstream();

Expand All @@ -951,8 +1053,18 @@ export class Upload extends Writable {
headers['Content-Length'] = bytesToUpload;
headers['Content-Range'] =
`bytes ${this.offset}-${endingByte}/${totalObjectSize}`;

// Apply X-Goog-Hash header ONLY on the final chunk (WriteObject call)
if (isLastChunkOfUpload) {
this.#applyChecksumHeaders(headers);
}
} else {
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;

if (this.#hashValidator) {
this.#hashValidator.end();
}
this.#applyChecksumHeaders(headers);
}

const reqOpts: GaxiosOptions = {
Expand Down Expand Up @@ -1046,7 +1158,29 @@ export class Upload extends Writable {
}

this.destroy(err);
} else {
} else if (this.isSuccessfulResponse(resp.status)) {
const serverCrc32c = resp.data.crc32c;
const serverMd5 = resp.data.md5Hash;

if (this.#hashValidator) {
this.#hashValidator.end();
}

const clientCrc32cToValidate =
this.#hashValidator?.crc32c || this.#clientCrc32c;
const clientMd5HashToValidate =
this.#hashValidator?.md5Digest || this.#clientMd5Hash;
if (
this.#validateChecksum(
clientCrc32cToValidate,
serverCrc32c,
'CRC32C'
) ||
this.#validateChecksum(clientMd5HashToValidate, serverMd5, 'MD5')
) {
return;
}

// no need to keep the cache
this.#resetLocalBuffersCache();

Expand All @@ -1058,6 +1192,11 @@ export class Upload extends Writable {
// Allow the object (Upload) to continue naturally so the user's
// "finish" event fires.
this.emit('uploadFinished');
} else {
// Handles the case where shouldContinueUploadInAnotherRequest is true
// and the response is not successful (e.g., 308 for a partial upload).
// This is the expected behavior for partial uploads that have finished their chunk.
this.emit('uploadFinished');
}
}

Expand Down
63 changes: 63 additions & 0 deletions system-test/kitchen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
RETRYABLE_ERR_FN_DEFAULT,
Storage,
} from '../src/storage.js';
import {CRC32C} from '../src/crc32c.js';

const bucketName = process.env.BUCKET_NAME || 'gcs-resumable-upload-test';

Expand Down Expand Up @@ -305,4 +306,66 @@ describe('resumable-upload', () => {
);
assert.equal(results.size, FILE_SIZE);
});

const KNOWN_CRC32C_OF_ZEROS = 'rthIWA==';
describe('Validation of Client Checksums Against Server Response', () => {
let crc32c: string;

before(async () => {
crc32c = (await CRC32C.fromFile(filePath)).toString();
});
it('should upload successfully when crc32c calculation is enabled', done => {
let uploadSucceeded = false;

fs.createReadStream(filePath)
.on('error', done)
.pipe(
upload({
bucket: bucketName,
file: filePath,
crc32c: true,
clientCrc32c: crc32c,
retryOptions: retryOptions,
})
)
.on('error', err => {
console.log(err);
done(
new Error(
`Upload failed unexpectedly on success path: ${err.message}`
)
);
})
.on('response', resp => {
uploadSucceeded = resp.status === 200;
})
.on('finish', () => {
assert.strictEqual(uploadSucceeded, true);
done();
});
});

it('should destroy the stream on a checksum mismatch (client-provided hash mismatch)', done => {
const EXPECTED_ERROR_MESSAGE_PART = `Provided CRC32C "${KNOWN_CRC32C_OF_ZEROS}" doesn't match calculated CRC32C`;

fs.createReadStream(filePath)
.on('error', done)
.pipe(
upload({
bucket: bucketName,
file: filePath,
clientCrc32c: KNOWN_CRC32C_OF_ZEROS,
crc32c: true,
retryOptions: retryOptions,
})
)
.on('error', (err: Error) => {
assert.ok(
err.message.includes(EXPECTED_ERROR_MESSAGE_PART),
`Expected error message part "${EXPECTED_ERROR_MESSAGE_PART}" not found in: ${err.message}`
);
done();
});
});
});
});
Loading