|
| 1 | +const async = require('async'); |
| 2 | +const { errors, versioning } = require('arsenal'); |
| 3 | +const { PassThrough } = require('stream'); |
| 4 | + |
| 5 | +const { getObjectSSEConfiguration } = require('./apiUtils/bucket/bucketEncryption'); |
| 6 | +const collectCorsHeaders = require('../utilities/collectCorsHeaders'); |
| 7 | +const createAndStoreObject = require('./apiUtils/object/createAndStoreObject'); |
| 8 | +const { standardMetadataValidateBucketAndObj } = require('../metadata/metadataUtils'); |
| 9 | +const { pushMetric } = require('../utapi/utilities'); |
| 10 | +const { validateHeaders } = require('./apiUtils/object/objectLockHelpers'); |
| 11 | +const kms = require('../kms/wrapper'); |
| 12 | +const { config } = require('../Config'); |
| 13 | +const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders'); |
| 14 | +const monitoring = require('../utilities/metrics'); |
| 15 | + |
| 16 | +const writeContinue = require('../utilities/writeContinue'); |
| 17 | +const { overheadField } = require('../../constants'); |
| 18 | + |
| 19 | + |
| 20 | +const versionIdUtils = versioning.VersionID; |
| 21 | + |
| 22 | + |
| 23 | +/** |
| 24 | + * POST Object in the requested bucket. Steps include: |
| 25 | + * validating metadata for authorization, bucket and object existence etc. |
| 26 | + * store object data in datastore upon successful authorization |
| 27 | + * store object location returned by datastore and |
| 28 | + * object's (custom) headers in metadata |
| 29 | + * return the result in final callback |
| 30 | + * |
| 31 | + * @param {AuthInfo} authInfo - Instance of AuthInfo class with requester's info |
| 32 | + * @param {request} request - request object given by router, |
| 33 | + * includes normalized headers |
| 34 | + * @param {object | undefined } streamingV4Params - if v4 auth, |
| 35 | + * object containing accessKey, signatureFromRequest, region, scopeDate, |
| 36 | + * timestamp, and credentialScope |
| 37 | + * (to be used for streaming v4 auth if applicable) |
| 38 | + * @param {object} log - the log request |
| 39 | + * @param {object} fileInfo - object containing file stream and filename |
| 40 | + * @param {Function} callback - final callback to call with the result |
| 41 | + * @return {undefined} |
| 42 | + */ |
| 43 | +function objectPost(authInfo, request, streamingV4Params, log, callback) { |
| 44 | + const { |
| 45 | + headers, |
| 46 | + method, |
| 47 | + } = request; |
| 48 | + let parsedContentLength = 0; |
| 49 | + |
| 50 | + const passThroughStream = new PassThrough(); |
| 51 | + |
| 52 | + // TODO CLDSRV-527 add acl header check |
| 53 | + // if (!aclUtils.checkGrantHeaderValidity(headers)) { |
| 54 | + // log.trace('invalid acl header'); |
| 55 | + // monitoring.promMetrics('PUT', request.bucketName, 400, |
| 56 | + // 'putObject'); |
| 57 | + // return callback(errors.InvalidArgument); |
| 58 | + // } |
| 59 | + // TODO CLDSRV-527 add check for versionId |
| 60 | + // const queryContainsVersionId = checkQueryVersionId(query); |
| 61 | + // if (queryContainsVersionId instanceof Error) { |
| 62 | + // return callback(queryContainsVersionId); |
| 63 | + // } |
| 64 | + const invalidSSEError = errors.InvalidArgument.customizeDescription( |
| 65 | + 'The encryption method specified is not supported'); |
| 66 | + const requestType = request.apiMethods || 'objectPost'; |
| 67 | + |
| 68 | + const valParams = { authInfo, bucketName: request.formData.bucket, objectKey: request.formData.key, requestType, request }; |
| 69 | + |
| 70 | + const canonicalID = authInfo.getCanonicalID(); |
| 71 | + |
| 72 | + // TODO CLDSRV-527 add check for non-printable characters? |
| 73 | + // if (hasNonPrintables(objectKey)) { |
| 74 | + // return callback(errors.InvalidInput.customizeDescription( |
| 75 | + // 'object keys cannot contain non-printable characters', |
| 76 | + // )); |
| 77 | + // } |
| 78 | + |
| 79 | + // TODO CLDSRV-527 add checksum header check |
| 80 | + // const checksumHeaderErr = validateChecksumHeaders(headers); |
| 81 | + // if (checksumHeaderErr) { |
| 82 | + // return callback(checksumHeaderErr); |
| 83 | + // } |
| 84 | + |
| 85 | + log.trace('owner canonicalID to send to data', { canonicalID }); |
| 86 | + |
| 87 | + return standardMetadataValidateBucketAndObj(valParams, request.actionImplicitDenies, log, |
| 88 | + (err, bucket, objMD) => { |
| 89 | + const responseHeaders = collectCorsHeaders(headers.origin, |
| 90 | + method, bucket); |
| 91 | + |
| 92 | + if (err && !err.AccessDenied) { |
| 93 | + log.trace('error processing request', { |
| 94 | + error: err, |
| 95 | + method: 'metadataValidateBucketAndObj', |
| 96 | + }); |
| 97 | + monitoring.promMetrics('POST', request.bucketName, err.code, 'postObject'); |
| 98 | + return callback(err, responseHeaders); |
| 99 | + } |
| 100 | + if (bucket.hasDeletedFlag() && canonicalID !== bucket.getOwner()) { |
| 101 | + log.trace('deleted flag on bucket and request ' + |
| 102 | + 'from non-owner account'); |
| 103 | + monitoring.promMetrics('POST', request.bucketName, 404, 'postObject'); |
| 104 | + return callback(errors.NoSuchBucket); |
| 105 | + } |
| 106 | + |
| 107 | + return async.waterfall([ |
| 108 | + function countPOSTFileSize(next) { |
| 109 | + request.file.on('data', (chunk) => { |
| 110 | + parsedContentLength += chunk.length; |
| 111 | + passThroughStream.write(chunk); |
| 112 | + |
| 113 | + }); |
| 114 | + |
| 115 | + request.file.on('end', () => { |
| 116 | + // Here totalBytes will have the total size of the file |
| 117 | + passThroughStream.end(); |
| 118 | + request.file = passThroughStream; |
| 119 | + request.parsedContentLength = parsedContentLength; |
| 120 | + return next(); |
| 121 | + }); |
| 122 | + return undefined; |
| 123 | + }, |
| 124 | + // TODO CLDSRV-527 add this back? |
| 125 | + // function handleTransientOrDeleteBuckets(next) { |
| 126 | + // if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) { |
| 127 | + // return cleanUpBucket(bucket, canonicalID, log, next); |
| 128 | + // } |
| 129 | + // return next(); |
| 130 | + // }, |
| 131 | + function getSSEConfig(next) { |
| 132 | + return getObjectSSEConfiguration(headers, bucket, log, |
| 133 | + (err, sseConfig) => { |
| 134 | + if (err) { |
| 135 | + log.error('error getting server side encryption config', { err }); |
| 136 | + return next(invalidSSEError); |
| 137 | + } |
| 138 | + return next(null, sseConfig); |
| 139 | + } |
| 140 | + ); |
| 141 | + }, |
| 142 | + function createCipherBundle(serverSideEncryptionConfig, next) { |
| 143 | + if (serverSideEncryptionConfig) { |
| 144 | + return kms.createCipherBundle( |
| 145 | + serverSideEncryptionConfig, log, next); |
| 146 | + } |
| 147 | + return next(null, null); |
| 148 | + }, |
| 149 | + function objectCreateAndStore(cipherBundle, next) { |
| 150 | + const objectLockValidationError |
| 151 | + = validateHeaders(bucket, headers, log); |
| 152 | + if (objectLockValidationError) { |
| 153 | + return next(objectLockValidationError); |
| 154 | + } |
| 155 | + writeContinue(request, request._response); |
| 156 | + return createAndStoreObject(request.bucketName, |
| 157 | + bucket, request.formData.key, objMD, authInfo, canonicalID, cipherBundle, |
| 158 | + request, false, streamingV4Params, overheadField, log, next); |
| 159 | + }, |
| 160 | + ], (err, storingResult) => { |
| 161 | + if (err) { |
| 162 | + monitoring.promMetrics('POST', request.bucketName, err.code, |
| 163 | + 'postObject'); |
| 164 | + return callback(err, responseHeaders); |
| 165 | + } |
| 166 | + // ingestSize assumes that these custom headers indicate |
| 167 | + // an ingestion PUT which is a metadata only operation. |
| 168 | + // Since these headers can be modified client side, they |
| 169 | + // should be used with caution if needed for precise |
| 170 | + // metrics. |
| 171 | + const ingestSize = (request.headers['x-amz-meta-mdonly'] |
| 172 | + && !Number.isNaN(request.headers['x-amz-meta-size'])) |
| 173 | + ? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null; |
| 174 | + const newByteLength = parsedContentLength; |
| 175 | + |
| 176 | + setExpirationHeaders(responseHeaders, { |
| 177 | + lifecycleConfig: bucket.getLifecycleConfiguration(), |
| 178 | + objectParams: { |
| 179 | + key: request.key, |
| 180 | + date: storingResult.lastModified, |
| 181 | + tags: storingResult.tags, |
| 182 | + }, |
| 183 | + }); |
| 184 | + |
| 185 | + // Utapi expects null or a number for oldByteLength: |
| 186 | + // * null - new object |
| 187 | + // * 0 or > 0 - existing object with content-length 0 or > 0 |
| 188 | + // objMD here is the master version that we would |
| 189 | + // have overwritten if there was an existing version or object |
| 190 | + // |
| 191 | + // TODO: Handle utapi metrics for null version overwrites. |
| 192 | + const oldByteLength = objMD && objMD['content-length'] |
| 193 | + !== undefined ? objMD['content-length'] : null; |
| 194 | + if (storingResult) { |
| 195 | + // ETag's hex should always be enclosed in quotes |
| 196 | + responseHeaders.ETag = `"${storingResult.contentMD5}"`; |
| 197 | + } |
| 198 | + const vcfg = bucket.getVersioningConfiguration(); |
| 199 | + const isVersionedObj = vcfg && vcfg.Status === 'Enabled'; |
| 200 | + if (isVersionedObj) { |
| 201 | + if (storingResult && storingResult.versionId) { |
| 202 | + responseHeaders['x-amz-version-id'] = |
| 203 | + versionIdUtils.encode(storingResult.versionId, |
| 204 | + config.versionIdEncodingType); |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + // Only pre-existing non-versioned objects get 0 all others use 1 |
| 209 | + const numberOfObjects = !isVersionedObj && oldByteLength !== null ? 0 : 1; |
| 210 | + |
| 211 | + // only the bucket owner's metrics should be updated, regardless of |
| 212 | + // who the requester is |
| 213 | + pushMetric('postObject', log, { |
| 214 | + authInfo, |
| 215 | + canonicalID: bucket.getOwner(), |
| 216 | + bucket: request.bucketName, |
| 217 | + keys: [request.key], |
| 218 | + newByteLength, |
| 219 | + oldByteLength: isVersionedObj ? null : oldByteLength, |
| 220 | + versionId: isVersionedObj && storingResult ? storingResult.versionId : undefined, |
| 221 | + location: bucket.getLocationConstraint(), |
| 222 | + numberOfObjects, |
| 223 | + }); |
| 224 | + monitoring.promMetrics('POST', request.bucketName, '204', |
| 225 | + 'postObject', newByteLength, oldByteLength, isVersionedObj, |
| 226 | + null, ingestSize); |
| 227 | + return callback(null, responseHeaders); |
| 228 | + }); |
| 229 | + }); |
| 230 | +} |
| 231 | + |
| 232 | +module.exports = objectPost; |
0 commit comments