Skip to content
12 changes: 9 additions & 3 deletions handwritten/storage/src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4505,13 +4505,19 @@ class Bucket extends ServiceObject<Bucket, BucketMetadata> {
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
fs.createReadStream(pathString)
.on('error', bail)
const readStream = fs.createReadStream(pathString);
readStream
.on('error', err => {
readStream.destroy();
writable.destroy();
bail(err);
})
.pipe(writable)
.on('error', err => {
readStream.destroy();
if (
this.storage.retryOptions.autoRetry &&
this.storage.retryOptions.retryableErrorFn!(err)
this.storage.retryOptions.retryableErrorFn!(err as ApiError)
) {
return reject(err);
} else {
Expand Down
46 changes: 46 additions & 0 deletions handwritten/storage/test/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,18 @@ class FakeNotification {
}

let fsStatOverride: Function | null;
let fsCreateReadStreamOverride: Function | null;
const fakeFs = {
...fs,
stat: (filePath: string, callback: Function) => {
return (fsStatOverride || fs.stat)(filePath, callback);
},
createReadStream: (filePath: string, options?: any) => {
return (fsCreateReadStreamOverride || fs.createReadStream)(
filePath,
options
);
},
};

let pLimitOverride: Function | null;
Expand Down Expand Up @@ -234,6 +241,7 @@ describe('Bucket', () => {

beforeEach(() => {
fsStatOverride = null;
fsCreateReadStreamOverride = null;
pLimitOverride = null;
bucket = new Bucket(STORAGE, BUCKET_NAME);
});
Expand Down Expand Up @@ -3231,6 +3239,44 @@ describe('Bucket', () => {
});
});

it('should destroy the local read stream if write stream fails', done => {
const fakeFile = new FakeFile(bucket, 'file-name');
const options = {destination: fakeFile, resumable: false};
const originalCreateReadStream = fs.createReadStream;
let readStream: fs.ReadStream;
fsCreateReadStreamOverride = (path: string, opts: any) => {
readStream = originalCreateReadStream(path, opts);
return readStream;
};

fakeFile.createWriteStream = (options_: CreateWriteStreamOptions) => {
const ws = new stream.Writable({
write(chunk, encoding, callback) {
callback(new Error('write error'));
},
});
return ws;
};

const textfilepath = path.join(
getDirName(),
'../../../test/testdata/textfile.txt'
);

bucket.upload(textfilepath, options, (err: Error) => {
try {
assert.strictEqual(err.message, 'write error');
assert.ok(readStream);
assert.ok(readStream.destroyed);
done();
} catch (e) {
done(e);
} finally {
fsCreateReadStreamOverride = null;
}
});
});

it('should allow overriding content type', done => {
const fakeFile = new FakeFile(bucket, 'file-name');
const metadata = {contentType: 'made-up-content-type'};
Expand Down
Loading