diff --git a/creator-node/package-lock.json b/creator-node/package-lock.json index 2c78eb517de..8ee47e5a389 100644 --- a/creator-node/package-lock.json +++ b/creator-node/package-lock.json @@ -4068,6 +4068,16 @@ "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==" }, + "fill-keys": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/fill-keys/-/fill-keys-1.0.2.tgz", + "integrity": "sha1-mo+jb06K1jTjv2tPPIiCVRRS6yA=", + "dev": true, + "requires": { + "is-object": "~1.0.1", + "merge-descriptors": "~1.0.0" + } + }, "fill-range": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-4.0.0.tgz", @@ -7676,6 +7686,12 @@ "resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-4.12.0.tgz", "integrity": "sha512-/P/HtrlvBxY4o/PzXY9cCNBrdylDNxg7gnrv2sMNxj+UJ2m8jSpl0/A6fuJeNAWr99ZvGWH8XCbE0vmnM5KupQ==" }, + "module-not-found-error": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/module-not-found-error/-/module-not-found-error-1.0.1.tgz", + "integrity": "sha1-z4tP9PKWQGdNbN0CsOO8UjwrvcA=", + "dev": true + }, "moment": { "version": "2.24.0", "resolved": "https://registry.npmjs.org/moment/-/moment-2.24.0.tgz", @@ -9288,6 +9304,17 @@ "ipaddr.js": "1.9.0" } }, + "proxyquire": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/proxyquire/-/proxyquire-2.1.3.tgz", + "integrity": "sha512-BQWfCqYM+QINd+yawJz23tbBM40VIGXOdDw3X344KcclI/gtBbdWF6SlQ4nK/bYhF9d27KYug9WzljHC6B9Ysg==", + "dev": true, + "requires": { + "fill-keys": "^1.0.2", + "module-not-found-error": "^1.0.1", + "resolve": "^1.11.1" + } + }, "pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", diff --git a/creator-node/package.json b/creator-node/package.json index 592cc90d6cf..98caa49146f 100644 --- a/creator-node/package.json +++ b/creator-node/package.json @@ -61,7 +61,8 @@ "sinon": "^7.0.0", "standard": "^12.0.1", "fs-extra": "^9.0.1", - "supertest": "^3.3.0" + "supertest": "^3.3.0", + "proxyquire": "^2.1.3" }, "//": { "dependenciesComments": { @@ -73,13 +74,8 @@ } }, "standard": { - "globals": [ - "assert", - "beforeEach", - "it", - "before", - "describe", - "afterEach" + "env": [ + "mocha" ] } } diff --git a/creator-node/scripts/run-tests.sh b/creator-node/scripts/run-tests.sh index 87f998dc7a6..fa03c13db71 100755 --- a/creator-node/scripts/run-tests.sh +++ b/creator-node/scripts/run-tests.sh @@ -28,12 +28,12 @@ tear_down () { run_unit_tests () { echo Running unit tests... - ./node_modules/mocha/bin/mocha --recursive 'src/**/*.test.js' + ./node_modules/mocha/bin/mocha --recursive 'src/**/*.test.js' --exit } run_integration_tests () { echo Running integration tests... - ./node_modules/mocha/bin/mocha --timeout 30000 --exit + ./node_modules/mocha/bin/mocha test/*.test.js --timeout 30000 --exit } if [ "$1" == "standalone_creator" ]; then diff --git a/creator-node/sequelize/migrations/20200911004845-allow-track-and-audiusUsers-appends.js b/creator-node/sequelize/migrations/20200911004845-allow-track-and-audiusUsers-appends.js new file mode 100644 index 00000000000..b92592e60a8 --- /dev/null +++ b/creator-node/sequelize/migrations/20200911004845-allow-track-and-audiusUsers-appends.js @@ -0,0 +1,74 @@ +'use strict' + +module.exports = { + up: async (queryInterface, Sequelize) => { + // Scope of the migration is: + // Remove trackUUID from Files and replace it with trackBlockchainId and migrate all existing values over + // Remove trackUUID field from Tracks table + // Remove UNIQUE constraint for blockchainId, trackUUID in Tracks table + // Add NOT NULL constraint for blockchainId in Tracks table + // Remove audiusUserUUID field from AudiusUsers table + // Remove UNIQUE constraint for audiusUserUUID in AudiusUsers table (no unique constraint on blockchainId) + // Add NOT NULL constraint for blockchainId in AudiusUsers table + + console.log('STARTING MIGRATION 20200911004845-allow-track-and-audiusUsers-appends') + await queryInterface.sequelize.query(` + BEGIN; + -- replace Files table in place with extra trackBlockchainId column and drops the trackUUID column + CREATE TABLE "Files_new" ( like "Files" ); + ALTER TABLE "Files_new" ADD COLUMN "trackBlockchainId" INTEGER; + INSERT INTO "Files_new" ("multihash", "sourceFile", "storagePath", "createdAt", "updatedAt", "fileUUID", "cnodeUserUUID", "trackUUID", "type", "fileName", "dirMultihash", "trackBlockchainId") SELECT f."multihash", f."sourceFile", f."storagePath", f."createdAt", f."updatedAt", f."fileUUID", f."cnodeUserUUID", f."trackUUID", f."type", f."fileName", f."dirMultihash", t."blockchainId" FROM "Files" f LEFT OUTER JOIN "Tracks" t ON f."trackUUID" = t."trackUUID"; + ALTER TABLE "Files_new" DROP COLUMN "trackUUID"; + ALTER TABLE "Files" RENAME TO "Files_old"; + ALTER TABLE "Files_new" RENAME TO "Files"; + DROP TABLE "Files_old" CASCADE; + + -- add pkey + -- ALTER TABLE "Files" ADD CONSTRAINT "Files_fileUUID_key" UNIQUE ("fileUUID"); + ALTER TABLE "Files" ADD PRIMARY KEY ("fileUUID"); + + -- add back indexes + CREATE INDEX "Files_multihash_idx" ON public."Files" USING btree (multihash); + CREATE INDEX "Files_cnodeUserUUID_idx" ON public."Files" USING btree ("cnodeUserUUID"); + CREATE INDEX "Files_dir_multihash_idx" ON public."Files" USING btree ("dirMultihash"); + CREATE INDEX "Files_trackBlockchainId_idx" ON public."Files" USING btree ("trackBlockchainId"); + + -- add in the foreign key constraint from Files to other tables + -- No fkey from Files to Tracks because we don't have a unique constraint on trackUUID or blockchainId on Tracks so postgres would reject the fkey + ALTER TABLE "Files" ADD CONSTRAINT "Files_cnodeUserUUID_fkey" FOREIGN KEY ("cnodeUserUUID") REFERENCES "CNodeUsers" ("cnodeUserUUID") ON DELETE RESTRICT; + + -- remove the unique constraints from Tracks + ALTER TABLE "Tracks" DROP CONSTRAINT "Tracks_trackUUID_key"; + ALTER TABLE "Tracks" DROP CONSTRAINT "blockchainId_unique_idx"; + + -- add a not null constraint to Tracks blockchainId, just run a delete query to remove any outstanding Tracks without a blockchainId in case there are any + DELETE FROM "Tracks" WHERE "blockchainId" IS NULL; + ALTER TABLE "Tracks" ALTER COLUMN "blockchainId" SET NOT NULL; + + -- remove the trackUUID column from Tracks + ALTER TABLE "Tracks" DROP COLUMN "trackUUID"; + + -- remove the unique constraint from AudiusUsers + ALTER TABLE "AudiusUsers" DROP CONSTRAINT "AudiusUsers_audiusUserUUID_key"; + + -- add a not null constraint to AudiusUsers blockchainId, just run a delete query to remove any outstanding AudiusUsers without a blockchainId in case there are any + DELETE FROM "AudiusUsers" WHERE "blockchainId" IS NULL; + ALTER TABLE "AudiusUsers" ALTER COLUMN "blockchainId" SET NOT NULL; + + -- remove the audiusUserUUID field as the AudiusUsers pkey + ALTER TABLE "AudiusUsers" DROP CONSTRAINT "AudiusUsers_pkey"; + + -- remove the audiusUserUUID column from AudiusUsers + ALTER TABLE "AudiusUsers" DROP COLUMN "audiusUserUUID"; + + COMMIT; + `) + console.log('FINISHED MIGRATION 20200911004845-allow-track-and-audiusUsers-appends') + }, + + down: (queryInterface, Sequelize) => { + /* + The up migration destroys tons of information, if we need to revert the best option is to restore from a snapshot + */ + } +} diff --git a/creator-node/sequelize/migrations/20200918150546-add-clock.js b/creator-node/sequelize/migrations/20200918150546-add-clock.js new file mode 100644 index 00000000000..2c070184f8a --- /dev/null +++ b/creator-node/sequelize/migrations/20200918150546-add-clock.js @@ -0,0 +1,116 @@ +'use strict' + +/** + * Content Tables = AudiusUsers, Tracks, Files + * CNodeUsers Table considered a Reference Table only + */ + +module.exports = { + up: async (queryInterface, Sequelize) => { + console.log('STARTING MIGRATION 20200918150546-add-clock') + const transaction = await queryInterface.sequelize.transaction() + + // Add 'clock' column to all 4 tables + await addClockColumn(queryInterface, Sequelize, transaction) + + // Create Clock table + await createClockRecordsTable(queryInterface, Sequelize, transaction) + + // Add composite unique constraint on (blockchainId, clock) to AudiusUsers and Tracks + // Add composite unique constraint on (cnodeUserUUID, clock) to Files + await addCompositeUniqueConstraints(queryInterface, Sequelize, transaction) + + await transaction.commit() + console.log('FINISHED MIGRATION 20200918150546-add-clock') + }, + + down: async (queryInterface, Sequelize) => { } +} + +async function addClockColumn (queryInterface, Sequelize, transaction) { + await queryInterface.addColumn('CNodeUsers', 'clock', { + type: Sequelize.INTEGER, + unique: false, + allowNull: true + }, { transaction }) + await queryInterface.addColumn('AudiusUsers', 'clock', { + type: Sequelize.INTEGER, + unique: false, + allowNull: true + }, { transaction }) + await queryInterface.addColumn('Tracks', 'clock', { + type: Sequelize.INTEGER, + unique: false, + allowNull: true + }, { transaction }) + await queryInterface.addColumn('Files', 'clock', { + type: Sequelize.INTEGER, + unique: false, + allowNull: true + }, { transaction }) +} + +async function addCompositeUniqueConstraints (queryInterface, Sequelize, transaction) { + await queryInterface.addConstraint( + 'AudiusUsers', + { + type: 'UNIQUE', + fields: ['blockchainId', 'clock'], + name: 'AudiusUsers_unique_(blockchainId,clock)', + transaction + } + ) + await queryInterface.addConstraint( + 'Tracks', + { + type: 'UNIQUE', + fields: ['blockchainId', 'clock'], + name: 'Tracks_unique_(blockchainId,clock)', + transaction + } + ) + await queryInterface.addConstraint( + 'Files', + { + type: 'UNIQUE', + fields: ['cnodeUserUUID', 'clock'], + name: 'Files_unique_(cnodeUserUUID,clock)', + transaction + } + ) +} + +async function createClockRecordsTable (queryInterface, Sequelize, transaction) { + await queryInterface.createTable('ClockRecords', { + cnodeUserUUID: { + type: Sequelize.UUID, + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + unique: false, + allowNull: false, + references: { + model: 'CNodeUsers', + key: 'cnodeUserUUID', + as: 'cnodeUserUUID' + }, + onDelete: 'RESTRICT' + }, + clock: { + type: Sequelize.INTEGER, + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + unique: false, + allowNull: false + }, + sourceTable: { + type: Sequelize.ENUM('AudiusUser', 'Track', 'File'), + allowNull: false + }, + createdAt: { + allowNull: false, + type: Sequelize.DATE + }, + updatedAt: { + allowNull: false, + type: Sequelize.DATE + } + }, { transaction }) +} diff --git a/creator-node/src/config.js b/creator-node/src/config.js index a6f8f0e9d52..e974776f433 100644 --- a/creator-node/src/config.js +++ b/creator-node/src/config.js @@ -203,6 +203,12 @@ const config = convict({ env: 'hlsSegmentType', default: 'mpegts' }, + printSequelizeLogs: { + doc: 'If we should print logs from sequelize', + format: Boolean, + env: 'printSequelizeLogs', + default: true + }, // Transcoding settings transcodingMaxConcurrency: { diff --git a/creator-node/src/dbManager.js b/creator-node/src/dbManager.js new file mode 100644 index 00000000000..d96ce8b821c --- /dev/null +++ b/creator-node/src/dbManager.js @@ -0,0 +1,54 @@ +const models = require('./models') +const sequelize = models.sequelize + +class DBManager { + /** + * Given file insert query object and cnodeUserUUID, inserts new file record in DB + * and handles all required clock management. + * Steps: + * 1. increments cnodeUser clock value by 1 + * 2. insert new ClockRecord entry with new clock value + * 3. insert new Data Table (File, Track, AudiusUser) entry with queryObj and new clock value + * In steps 2 and 3, clock values are read as subquery to guarantee atomicity + */ + static async createNewDataRecord (queryObj, cnodeUserUUID, sequelizeTableInstance, transaction) { + // Increment CNodeUser.clock value by 1 + await models.CNodeUser.increment('clock', { + where: { cnodeUserUUID }, + by: 1, + transaction + }) + + const selectCNodeUserClockSubqueryLiteral = _getSelectCNodeUserClockSubqueryLiteral(cnodeUserUUID) + + // Add row in ClockRecords table using new CNodeUser.clock + await models.ClockRecord.create({ + cnodeUserUUID, + clock: selectCNodeUserClockSubqueryLiteral, + sourceTable: sequelizeTableInstance.name + }, { transaction }) + + // Add cnodeUserUUID + clock value to queryObj + queryObj.cnodeUserUUID = cnodeUserUUID + queryObj.clock = selectCNodeUserClockSubqueryLiteral + + // Create new Data table entry with queryObj using new CNodeUser.clock + const file = await sequelizeTableInstance.create(queryObj, { transaction }) + + return file.dataValues + } +} + +/** + * returns string literal `select "clock" from "CNodeUsers" where "cnodeUserUUID" = '${cnodeUserUUID}'` + * @dev source: https://stackoverflow.com/questions/36164694/sequelize-subquery-in-where-clause + */ +function _getSelectCNodeUserClockSubqueryLiteral (cnodeUserUUID) { + const subquery = sequelize.dialect.QueryGenerator.selectQuery('CNodeUsers', { + attributes: ['clock'], + where: { cnodeUserUUID } + }).slice(0, -1) // removes trailing ';' + return sequelize.literal(`(${subquery})`) +} + +module.exports = DBManager diff --git a/creator-node/src/fileManager.js b/creator-node/src/fileManager.js index 8105d82be75..00b0214222e 100644 --- a/creator-node/src/fileManager.js +++ b/creator-node/src/fileManager.js @@ -13,7 +13,6 @@ const writeFile = promisify(fs.writeFile) const mkdir = promisify(fs.mkdir) const config = require('./config') -const models = require('./models') const Utils = require('./utils') const MAX_AUDIO_FILE_SIZE = parseInt(config.get('maxAudioFileSizeBytes')) // Default = 250,000,000 bytes = 250MB @@ -23,12 +22,11 @@ const ALLOWED_UPLOAD_FILE_EXTENSIONS = config.get('allowedUploadFileExtensions') const AUDIO_MIME_TYPE_REGEX = /audio\/(.*)/ /** - * (1) Add file to IPFS; (2) save file to disk; - * (3) add file via IPFS; (4) save file ref to DB - * @dev - only call this function when file is not already stored to disk - * - if it is, then use saveFileToIPFSFromFS() + * Adds file to IPFS then saves file to disk under /multihash name + * + */ -async function saveFileFromBuffer (req, buffer, fileType) { +async function saveFileFromBufferToIPFSAndDisk (req, buffer) { // make sure user has authenticated before saving file if (!req.session.cnodeUserUUID) { throw new Error('User must be authenticated to save a file') @@ -36,32 +34,22 @@ async function saveFileFromBuffer (req, buffer, fileType) { const ipfs = req.app.get('ipfsAPI') + // Add to IPFS without pinning and retrieve multihash const multihash = (await ipfs.add(buffer, { pin: false }))[0].hash + // Write file to disk by multihash for future retrieval const dstPath = path.join(req.app.get('storagePath'), multihash) - await writeFile(dstPath, buffer) - // add reference to file to database - const file = (await models.File.findOrCreate({ where: { - cnodeUserUUID: req.session.cnodeUserUUID, - multihash: multihash, - sourceFile: req.fileName, - storagePath: dstPath, - type: fileType - } }))[0].dataValues - - req.logger.info('\nAdded file:', multihash, 'file id', file.fileUUID) - return { multihash: multihash, fileUUID: file.fileUUID } + return { multihash, dstPath } } /** - * Save file to IPFS given file path. - * - Add file to IPFS. - * - Re-save file to disk under multihash. - * - Save reference to file in DB. + * Given file path on disk, adds file to IPFS + re-saves under /multihash name + * + * @dev - only call this function when file is already stored to disk, else use saveFileFromBufferToIPFSAndDisk() */ -async function saveFileToIPFSFromFS (req, srcPath, fileType, sourceFile, transaction = null) { +async function saveFileToIPFSFromFS (req, srcPath) { // make sure user has authenticated before saving file if (!req.session.cnodeUserUUID) { throw new Error('User must be authenticated to save a file') @@ -69,35 +57,14 @@ async function saveFileToIPFSFromFS (req, srcPath, fileType, sourceFile, transac const ipfs = req.app.get('ipfsAPI') - req.logger.info(`beginning saveFileToIPFSFromFS for srcPath ${srcPath}`) - - let codeBlockTimeStart = Date.now() - + // Add to IPFS without pinning and retrieve multihash const multihash = (await ipfs.addFromFs(srcPath, { pin: false }))[0].hash - req.logger.info(`Time taken in saveFileToIpfsFromFS to add: ${Date.now() - codeBlockTimeStart}`) - codeBlockTimeStart = Date.now() - const dstPath = path.join(req.app.get('storagePath'), multihash) - // store segment file copy under multihash for easy future retrieval + // store file copy by multihash for future retrieval + const dstPath = path.join(req.app.get('storagePath'), multihash) fs.copyFileSync(srcPath, dstPath) - req.logger.info(`Time taken in saveFileToIpfsFromFS to copyFileSync: ${Date.now() - codeBlockTimeStart}`) - - // add reference to file to database - const queryObj = { where: { - cnodeUserUUID: req.session.cnodeUserUUID, - multihash: multihash, - sourceFile: sourceFile, - storagePath: dstPath, - type: fileType - } } - if (transaction) { - queryObj.transaction = transaction - } - const file = ((await models.File.findOrCreate(queryObj))[0].dataValues) - - req.logger.info(`Added file: ${multihash} for fileUUID ${file.fileUUID} from sourceFile ${sourceFile}`) - return { multihash: multihash, fileUUID: file.fileUUID } + return { multihash, dstPath } } /** @@ -401,7 +368,7 @@ function getFileExtension (fileName) { } module.exports = { - saveFileFromBuffer, + saveFileFromBufferToIPFSAndDisk, saveFileToIPFSFromFS, saveFileForMultihash, removeTrackFolder, diff --git a/creator-node/src/middlewares.js b/creator-node/src/middlewares.js index 1072d885988..16f9daa1e0c 100644 --- a/creator-node/src/middlewares.js +++ b/creator-node/src/middlewares.js @@ -99,7 +99,7 @@ async function triggerSecondarySyncs (req) { try { if (!req.session.nodeIsPrimary || !req.session.creatorNodeEndpoints || !Array.isArray(req.session.creatorNodeEndpoints)) return const [primary, ...secondaries] = req.session.creatorNodeEndpoints - + req.logger.error(`SIDTEST: primary ${primary} calling sync against: ${secondaries}`) await Promise.all(secondaries.map(async secondary => { if (!secondary || !_isFQDN(secondary)) return const axiosReq = { diff --git a/creator-node/src/models/audiususer.js b/creator-node/src/models/audiususer.js index 849312dcde5..26f1343945b 100644 --- a/creator-node/src/models/audiususer.js +++ b/creator-node/src/models/audiususer.js @@ -1,19 +1,18 @@ 'use strict' module.exports = (sequelize, DataTypes) => { const AudiusUser = sequelize.define('AudiusUser', { - audiusUserUUID: { + cnodeUserUUID: { type: DataTypes.UUID, - allowNull: false, - primaryKey: true, - defaultValue: DataTypes.UUIDV4 + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + allowNull: false + }, + clock: { + type: DataTypes.INTEGER, + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + allowNull: false }, blockchainId: { type: DataTypes.BIGINT, - unique: true, - allowNull: true - }, - cnodeUserUUID: { - type: DataTypes.UUID, allowNull: false }, metadataFileUUID: { @@ -32,7 +31,14 @@ module.exports = (sequelize, DataTypes) => { type: DataTypes.UUID, allowNull: true } - }, {}) + }, { + indexes: [ + { + unique: true, + fields: ['blockchainId', 'clock'] + } + ] + }) AudiusUser.associate = function (models) { AudiusUser.belongsTo(models.CNodeUser, { foreignKey: 'cnodeUserUUID', diff --git a/creator-node/src/models/cNodeUser.js b/creator-node/src/models/cNodeUser.js index 277f0c6a3bb..bcdf0ca1bb1 100644 --- a/creator-node/src/models/cNodeUser.js +++ b/creator-node/src/models/cNodeUser.js @@ -21,6 +21,10 @@ module.exports = (sequelize, DataTypes) => { type: DataTypes.INTEGER, allowNull: false, defaultValue: -1 + }, + clock: { + type: DataTypes.INTEGER, + allowNull: false } }, {}) diff --git a/creator-node/src/models/clockRecord.js b/creator-node/src/models/clockRecord.js new file mode 100644 index 00000000000..7d5ac6af84f --- /dev/null +++ b/creator-node/src/models/clockRecord.js @@ -0,0 +1,52 @@ +'use strict' +module.exports = (sequelize, DataTypes) => { + // TODO - why is this not externally accessible? + const SourceTableTypesObj = { + AudiusUser: 'AudiusUser', + Track: 'Track', + File: 'File' + } + + const ClockRecord = sequelize.define('ClockRecord', { + cnodeUserUUID: { + type: DataTypes.UUID, + primaryKey: true, // composite PK with clock + unique: false, + allowNull: false, + references: { + model: 'CNodeUsers', + key: 'cnodeUserUUID', + as: 'cnodeUserUUID' + }, + onDelete: 'RESTRICT' + }, + clock: { + type: DataTypes.INTEGER, + primaryKey: true, // composite PK with cnodeUserUUID + unique: false, + allowNull: false + }, + sourceTable: { + type: DataTypes.ENUM( + ...Object.values(SourceTableTypesObj) + ), + allowNull: false + } + }, {}) + + /** + * TODO - enforce composite foreign key (cnodeUserUUID, clock) on all SourceTables + * - https://stackoverflow.com/questions/9984022/postgres-fk-referencing-composite-pk + */ + ClockRecord.associate = (models) => { + ClockRecord.belongsTo(models.CNodeUser, { + foreignKey: 'cnodeUserUUID', + sourceKey: 'cnodeUserUUID', + onDelete: 'RESTRICT' + }) + } + + ClockRecord.SourceTableTypesObj = SourceTableTypesObj + + return ClockRecord +} diff --git a/creator-node/src/models/file.js b/creator-node/src/models/file.js index a7a5d2d02f9..3adf6172827 100644 --- a/creator-node/src/models/file.js +++ b/creator-node/src/models/file.js @@ -12,10 +12,10 @@ module.exports = (sequelize, DataTypes) => { type: DataTypes.UUID, allowNull: false }, - // only non-null for track files (as opposed to image/metadata files) - trackUUID: { - type: DataTypes.UUID, - allowNull: true // `true` as we use File entries for more than just uploaded tracks + // only non-null for track/copy320 files (as opposed to image/metadata files) + trackBlockchainId: { + type: DataTypes.INTEGER, + allowNull: true }, multihash: { type: DataTypes.TEXT, @@ -49,6 +49,10 @@ module.exports = (sequelize, DataTypes) => { // track and non types broken down below and attached to Track model isIn: [['track', 'metadata', 'image', 'dir', 'copy320']] } + }, + clock: { + type: DataTypes.INTEGER, + allowNull: false } }, { indexes: [ @@ -60,23 +64,30 @@ module.exports = (sequelize, DataTypes) => { }, { fields: ['dirMultihash'] + }, + { + fields: ['trackBlockchainId'] + }, + { + unique: true, + fields: ['cnodeUserUUID', 'clock'] } ] }) + /** + * @dev - there is intentionally no reference from File.trackBlockchainId to Track.blockchainId. This is to + * remove the two-way association between these models + */ File.associate = function (models) { File.belongsTo(models.CNodeUser, { foreignKey: 'cnodeUserUUID', sourceKey: 'cnodeUserUUID', onDelete: 'RESTRICT' }) - File.belongsTo(models.Track, { - foreignKey: 'trackUUID', - sourceKey: 'trackUUID', - onDelete: 'RESTRICT' - }) } + // TODO - why is this not externally accessible? File.TrackTypes = ['track', 'copy320'] File.NonTrackTypes = ['dir', 'image', 'metadata'] diff --git a/creator-node/src/models/index.js b/creator-node/src/models/index.js index b7b3c95d569..6cc6fd6e674 100644 --- a/creator-node/src/models/index.js +++ b/creator-node/src/models/index.js @@ -10,7 +10,7 @@ const basename = path.basename(__filename) const db = {} const sequelize = new Sequelize(globalConfig.get('dbUrl'), { - logging: true, + logging: globalConfig.get('printSequelizeLogs'), operatorsAliases: false, pool: { max: 100, diff --git a/creator-node/src/models/track.js b/creator-node/src/models/track.js index fd6edba477d..5670794961c 100644 --- a/creator-node/src/models/track.js +++ b/creator-node/src/models/track.js @@ -2,14 +2,18 @@ module.exports = (sequelize, DataTypes) => { const Track = sequelize.define('Track', { - trackUUID: { - type: DataTypes.UUID, - allowNull: false, - primaryKey: true, - defaultValue: DataTypes.UUIDV4 - }, cnodeUserUUID: { type: DataTypes.UUID, + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + allowNull: false + }, + clock: { + type: DataTypes.INTEGER, + primaryKey: true, // composite primary key (cnodeUserUUID, clock) + allowNull: false + }, + blockchainId: { + type: DataTypes.BIGINT, allowNull: false }, metadataFileUUID: { @@ -20,16 +24,18 @@ module.exports = (sequelize, DataTypes) => { type: DataTypes.JSONB, allowNull: false }, - blockchainId: { - type: DataTypes.BIGINT, - allowNull: true, - unique: true - }, coverArtFileUUID: { type: DataTypes.UUID, allowNull: true } - }, {}) + }, { + indexes: [ + { + unique: true, + fields: ['blockchainId', 'clock'] + } + ] + }) Track.associate = function (models) { Track.belongsTo(models.CNodeUser, { @@ -37,12 +43,7 @@ module.exports = (sequelize, DataTypes) => { targetKey: 'cnodeUserUUID', onDelete: 'RESTRICT' }) - Track.belongsTo(models.File, { // belongsTo, or hasMany? - foreignKey: 'trackUUID', - targetKey: 'trackUUID', - onDelete: 'RESTRICT' - }) - Track.belongsTo(models.File, { // belongsTo, or hasOne + Track.belongsTo(models.File, { foreignKey: 'metadataFileUUID', targetKey: 'fileUUID', onDelete: 'RESTRICT' diff --git a/creator-node/src/routes/audiusUsers.js b/creator-node/src/routes/audiusUsers.js index a4c54d26626..4dce624d793 100644 --- a/creator-node/src/routes/audiusUsers.js +++ b/creator-node/src/routes/audiusUsers.js @@ -2,29 +2,54 @@ const { Buffer } = require('ipfs-http-client') const fs = require('fs') const models = require('../models') -const { saveFileFromBuffer } = require('../fileManager') +const { saveFileFromBufferToIPFSAndDisk } = require('../fileManager') const { handleResponse, successResponse, errorResponseBadRequest, errorResponseServerError } = require('../apiHelpers') const { getFileUUIDForImageCID } = require('../utils') const { authMiddleware, syncLockMiddleware, ensurePrimaryMiddleware, triggerSecondarySyncs } = require('../middlewares') +const DBManager = require('../dbManager') module.exports = function (app) { - /** Create AudiusUser from provided metadata, and make metadata available to network. */ + /** + * Create AudiusUser from provided metadata, and make metadata available to network + */ app.post('/audius_users/metadata', authMiddleware, syncLockMiddleware, handleResponse(async (req, res) => { // TODO - input validation const metadataJSON = req.body.metadata - const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON)) - let multihash, fileUUID + const cnodeUserUUID = req.session.cnodeUserUUID + + // Save file from buffer to IPFS and disk + let multihash, dstPath + try { + const resp = await saveFileFromBufferToIPFSAndDisk(req, metadataBuffer) + multihash = resp.multihash + dstPath = resp.dstPath + } catch (e) { + return errorResponseServerError(`saveFileFromBufferToIPFSAndDisk op failed: ${e}`) + } + // Record metadata file entry in DB + const transaction = await models.sequelize.transaction() + let fileUUID try { - const saveFileFromBufferResp = await saveFileFromBuffer(req, metadataBuffer, 'metadata') - multihash = saveFileFromBufferResp.multihash - fileUUID = saveFileFromBufferResp.fileUUID + const createFileQueryObj = { + multihash, + sourceFile: req.fileName, + storagePath: dstPath, + type: 'metadata' // TODO - replace with models enum + } + const file = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, models.File, transaction) + fileUUID = file.fileUUID + await transaction.commit() } catch (e) { - return errorResponseServerError(`Could not save file to disk, ipfs, and/or db: ${e}`) + await transaction.rollback() + return errorResponseServerError(`Could not save to db: ${e}`) } - return successResponse({ 'metadataMultihash': multihash, 'metadataFileUUID': fileUUID }) + return successResponse({ + 'metadataMultihash': multihash, + 'metadataFileUUID': fileUUID + }) })) /** @@ -48,7 +73,7 @@ module.exports = function (app) { // Fetch metadataJSON for metadataFileUUID. const file = await models.File.findOne({ where: { fileUUID: metadataFileUUID, cnodeUserUUID } }) if (!file) { - return errorResponseBadRequest(`No file found for provided metadataFileUUID ${metadataFileUUID}.`) + return errorResponseBadRequest(`No file db record found for provided metadataFileUUID ${metadataFileUUID}.`) } let metadataJSON try { @@ -66,26 +91,26 @@ module.exports = function (app) { return errorResponseBadRequest(e.message) } - const t = await models.sequelize.transaction() + // Record AudiusUser entry + update CNodeUser entry in DB + const transaction = await models.sequelize.transaction() try { - // Insert / update audiusUser entry on db. - const audiusUser = await models.AudiusUser.upsert({ - cnodeUserUUID, + const createAudiusUserQueryObj = { metadataFileUUID, metadataJSON, blockchainId: blockchainUserId, coverArtFileUUID, profilePicFileUUID - }, { transaction: t, returning: true }) + } + await DBManager.createNewDataRecord(createAudiusUserQueryObj, cnodeUserUUID, models.AudiusUser, transaction) - // Update cnodeUser's latestBlockNumber. - await cnodeUser.update({ latestBlockNumber: blockNumber }, { transaction: t }) + // Update cnodeUser.latestBlockNumber + await cnodeUser.update({ latestBlockNumber: blockNumber }, { transaction }) - await t.commit() + await transaction.commit() triggerSecondarySyncs(req) - return successResponse({ audiusUserUUID: audiusUser.audiusUserUUID }) + return successResponse() } catch (e) { - await t.rollback() + await transaction.rollback() return errorResponseServerError(e.message) } })) diff --git a/creator-node/src/routes/files.js b/creator-node/src/routes/files.js index 131a79d915a..e694c3b3eda 100644 --- a/creator-node/src/routes/files.js +++ b/creator-node/src/routes/files.js @@ -22,6 +22,7 @@ const { authMiddleware, syncLockMiddleware, triggerSecondarySyncs } = require('. const { getIPFSPeerId, ipfsSingleByteCat, ipfsStat } = require('../utils') const ImageProcessingQueue = require('../ImageProcessingQueue') const RehydrateIpfsQueue = require('../RehydrateIpfsQueue') +const DBManager = require('../dbManager') /** * Helper method to stream file from file system on creator node @@ -96,9 +97,12 @@ const getCID = async (req, res) => { } // Don't serve if not found in DB. - const queryResults = await models.File.findOne({ where: { - multihash: CID - } }) + const queryResults = await models.File.findOne({ + where: { + multihash: CID + }, + order: [['clock', 'DESC']] + }) if (!queryResults) { return sendResponse(req, res, errorResponseNotFound(`No valid file found for provided CID: ${CID}`)) } @@ -185,10 +189,13 @@ const getDirCID = async (req, res) => { // Don't serve if not found in DB. // Query for the file based on the dirCID and filename - const queryResults = await models.File.findOne({ where: { - dirMultihash: dirCID, - fileName: filename - } }) + const queryResults = await models.File.findOne({ + where: { + dirMultihash: dirCID, + fileName: filename + }, + order: [['clock', 'DESC']] + }) if (!queryResults) { return sendResponse( req, @@ -234,7 +241,9 @@ const getDirCID = async (req, res) => { } module.exports = function (app) { - /** Store image in multiple-resolutions on disk + DB and make available via IPFS */ + /** + * Store image in multiple-resolutions on disk + DB and make available via IPFS + */ app.post('/image_upload', authMiddleware, syncLockMiddleware, uploadTempDiskStorage.single('file'), handleResponse(async (req, res) => { if (!req.body.square || !(req.body.square === 'true' || req.body.square === 'false')) { return errorResponseBadRequest('Must provide square boolean param in request body') @@ -242,13 +251,14 @@ module.exports = function (app) { if (!req.file) { return errorResponseBadRequest('Must provide image file in request body.') } - let routestart = Date.now() + const routestart = Date.now() const imageBufferOriginal = req.file.path const originalFileName = req.file.originalname - let resizeResp + const cnodeUserUUID = req.session.cnodeUserUUID // Resize the images and add them to IPFS and filestorage + let resizeResp try { if (req.body.square === 'true') { resizeResp = await ImageProcessingQueue.resizeImage({ @@ -277,48 +287,43 @@ module.exports = function (app) { }) } - req.logger.info('ipfs add resp', resizeResp) + req.logger.debug('ipfs add resp', resizeResp) } catch (e) { return errorResponseServerError(e) } - const t = await models.sequelize.transaction() - // Add the created files to the DB + // Record image file entries in DB + const transaction = await models.sequelize.transaction() try { - // Save dir file reference to DB - const dir = (await models.File.findOrCreate({ where: { - cnodeUserUUID: req.session.cnodeUserUUID, + // Record dir file entry in DB + const createDirFileQueryObj = { multihash: resizeResp.dir.dirCID, sourceFile: null, storagePath: resizeResp.dir.dirDestPath, - type: 'dir' - }, - transaction: t }))[0].dataValues - - // Save each file to the DB - await Promise.all(resizeResp.files.map(async (fileResp) => { - const file = (await models.File.findOrCreate({ where: { - cnodeUserUUID: req.session.cnodeUserUUID, - multihash: fileResp.multihash, - sourceFile: fileResp.sourceFile, - storagePath: fileResp.storagePath, - type: 'image', + type: 'dir' // TODO - replace with models enum + } + await DBManager.createNewDataRecord(createDirFileQueryObj, cnodeUserUUID, models.File, transaction) + + // Record all image res file entries in DB + // Must be written sequentially to ensure clock values are correctly incremented and populated + for (const file of resizeResp.files) { + const createImageFileQueryObj = { + multihash: file.multihash, + sourceFile: file.sourceFile, + storagePath: file.storagePath, + type: 'image', // TODO - replace with models enum dirMultihash: resizeResp.dir.dirCID, - fileName: fileResp.sourceFile.split('/').slice(-1)[0] - }, - transaction: t }))[0].dataValues - - req.logger.info('Added file', fileResp, file) - })) + fileName: file.sourceFile.split('/').slice(-1)[0] + } + await DBManager.createNewDataRecord(createImageFileQueryObj, cnodeUserUUID, models.File, transaction) + } - req.logger.info('Added all files for dir', dir) req.logger.info(`route time = ${Date.now() - routestart}`) - - await t.commit() + await transaction.commit() triggerSecondarySyncs(req) return successResponse({ dirCID: resizeResp.dir.dirCID }) } catch (e) { - await t.rollback() + await transaction.rollback() return errorResponseServerError(e) } })) diff --git a/creator-node/src/routes/nodeSync.js b/creator-node/src/routes/nodeSync.js index a6c7b96b840..dc3cef47f0c 100644 --- a/creator-node/src/routes/nodeSync.js +++ b/creator-node/src/routes/nodeSync.js @@ -24,77 +24,136 @@ module.exports = function (app) { * } */ app.get('/export', handleResponse(async (req, res) => { + // TODO - allow for offsets in the /export const walletPublicKeys = req.query.wallet_public_key // array + const dbOnlySync = (req.query.db_only_sync === true || req.query.db_only_sync === 'true') - const t = await models.sequelize.transaction() + const MaxClock = 25000 + + const transaction = await models.sequelize.transaction() try { // Fetch cnodeUser for each walletPublicKey. - const cnodeUsers = await models.CNodeUser.findAll({ where: { walletPublicKey: walletPublicKeys }, transaction: t }) + const cnodeUsers = await models.CNodeUser.findAll({ where: { walletPublicKey: walletPublicKeys }, transaction, raw: true }) const cnodeUserUUIDs = cnodeUsers.map((cnodeUser) => cnodeUser.cnodeUserUUID) - // Fetch all data for cnodeUserUUIDs: audiusUsers, tracks, files. - const [audiusUsers, tracks, files] = await Promise.all([ - models.AudiusUser.findAll({ where: { cnodeUserUUID: cnodeUserUUIDs }, transaction: t }), - models.Track.findAll({ where: { cnodeUserUUID: cnodeUserUUIDs }, transaction: t }), - models.File.findAll({ where: { cnodeUserUUID: cnodeUserUUIDs }, transaction: t }) + // Fetch all data for cnodeUserUUIDs: audiusUsers, tracks, files, clockRecords. + + const [audiusUsers, tracks, files, clockRecords] = await Promise.all([ + models.AudiusUser.findAll({ + where: { + cnodeUserUUID: cnodeUserUUIDs, + clock: { + [models.Sequelize.Op.lte]: MaxClock + } + }, + order: [['clock', 'ASC']], + transaction, + raw: true + }), + models.Track.findAll({ + where: { + cnodeUserUUID: cnodeUserUUIDs, + clock: { + [models.Sequelize.Op.lte]: MaxClock + } + }, + order: [['clock', 'ASC']], + transaction, + raw: true + }), + models.File.findAll({ + where: { + cnodeUserUUID: cnodeUserUUIDs, + clock: { + [models.Sequelize.Op.lte]: MaxClock + } + }, + order: [['clock', 'ASC']], + transaction, + raw: true + }), + models.ClockRecord.findAll({ + where: { + cnodeUserUUID: cnodeUserUUIDs, + clock: { + [models.Sequelize.Op.lte]: MaxClock + } + }, + order: [['clock', 'ASC']], + transaction, + raw: true + }) ]) - await t.commit() + + await transaction.commit() /** Bundle all data into cnodeUser objects to maximize import speed. */ const cnodeUsersDict = {} cnodeUsers.forEach(cnodeUser => { - // Convert sequelize object to plain js object to allow adding additional fields. - const cnodeUserDictObj = cnodeUser.toJSON() - - // Add cnodeUserUUID data fields. - cnodeUserDictObj['audiusUsers'] = [] - cnodeUserDictObj['tracks'] = [] - cnodeUserDictObj['files'] = [] - - cnodeUsersDict[cnodeUser.cnodeUserUUID] = cnodeUserDictObj + // Add cnodeUserUUID data fields + cnodeUser['audiusUsers'] = [] + cnodeUser['tracks'] = [] + cnodeUser['files'] = [] + cnodeUser['clockRecords'] = [] + + cnodeUsersDict[cnodeUser.cnodeUserUUID] = cnodeUser + + // TODO - remove this once we no longer have a MaxClock in export + // this just overrides the clock value to the max clock we're sending over to the secondary so it knows + // there's more data to pull + if (cnodeUser.clock > MaxClock) { + // since clockRecords are returned by clock ASC, clock val at last index is largest clock val + console.log('nodeSync.js#export - cnode user clock value is higher than MaxClock, resetting', clockRecords[clockRecords.length - 1].clock) + cnodeUser.clock = clockRecords[clockRecords.length - 1].clock + } }) audiusUsers.forEach(audiusUser => { - const audiusUserDictObj = audiusUser.toJSON() - cnodeUsersDict[audiusUserDictObj['cnodeUserUUID']]['audiusUsers'].push(audiusUserDictObj) + cnodeUsersDict[audiusUser.cnodeUserUUID]['audiusUsers'].push(audiusUser) }) tracks.forEach(track => { - let trackDictObj = track.toJSON() - cnodeUsersDict[trackDictObj['cnodeUserUUID']]['tracks'].push(trackDictObj) + cnodeUsersDict[track.cnodeUserUUID]['tracks'].push(track) }) files.forEach(file => { - let fileDictObj = file.toJSON() - cnodeUsersDict[fileDictObj['cnodeUserUUID']]['files'].push(fileDictObj) + cnodeUsersDict[file.cnodeUserUUID]['files'].push(file) + }) + clockRecords.forEach(clockRecord => { + cnodeUsersDict[clockRecord.cnodeUserUUID]['clockRecords'].push(clockRecord) }) // Expose ipfs node's peer ID. const ipfs = req.app.get('ipfsAPI') - let ipfsIDObj = await getIPFSPeerId(ipfs, config) - - for (let i = 0; i < files.length; i += RehydrateIPFSConcurrencyLimit) { - const exportFilesSlice = files.slice(i, i + RehydrateIPFSConcurrencyLimit) - req.logger.info(`Export rehydrateIpfs processing files ${i} to ${i + RehydrateIPFSConcurrencyLimit}`) - // Ensure all relevant files are available through IPFS at export time - await Promise.all(exportFilesSlice.map(async (file) => { - try { - if ( - (file.type === 'track' || file.type === 'metadata' || file.type === 'copy320') || - // to address legacy single-res image rehydration where images are stored directly under its file CID - (file.type === 'image' && file.sourceFile === null) - ) { - await RehydrateIpfsQueue.addRehydrateIpfsFromFsIfNecessaryTask(file.multihash, file.storagePath, { logContext: req.logContext }) - } else if (file.type === 'dir') { - await RehydrateIpfsQueue.addRehydrateIpfsDirFromFsIfNecessaryTask(file.multihash, { logContext: req.logContext }) + const ipfsIDObj = await getIPFSPeerId(ipfs, config) + + if (!dbOnlySync) { + // Rehydrate files if necessary + for (let i = 0; i < files.length; i += RehydrateIPFSConcurrencyLimit) { + const exportFilesSlice = files.slice(i, i + RehydrateIPFSConcurrencyLimit) + req.logger.info(`Export rehydrateIpfs processing files ${i} to ${i + RehydrateIPFSConcurrencyLimit}`) + // Ensure all relevant files are available through IPFS at export time + await Promise.all(exportFilesSlice.map(async (file) => { + try { + if ( + (file.type === 'track' || file.type === 'metadata' || file.type === 'copy320') || + // to address legacy single-res image rehydration where images are stored directly under its file CID + (file.type === 'image' && file.sourceFile === null) + ) { + await RehydrateIpfsQueue.addRehydrateIpfsFromFsIfNecessaryTask(file.multihash, file.storagePath, { logContext: req.logContext }) + } else if (file.type === 'dir') { + await RehydrateIpfsQueue.addRehydrateIpfsDirFromFsIfNecessaryTask(file.multihash, { logContext: req.logContext }) + } + } catch (e) { + req.logger.info(`Export rehydrateIpfs processing files ${i} to ${i + RehydrateIPFSConcurrencyLimit}, ${e}`) } - } catch (e) { - req.logger.info(`Export rehydrateIpfs processing files ${i} to ${i + RehydrateIPFSConcurrencyLimit}, ${e}`) - } - })) + })) + } } - return successResponse({ cnodeUsers: cnodeUsersDict, ipfsIDObj: ipfsIDObj }) + + return successResponse({ cnodeUsers: cnodeUsersDict, ipfsIDObj }) } catch (e) { - await t.rollback() + console.error('Error in /export', e) + await transaction.rollback() return errorResponseServerError(e.message) } })) @@ -107,6 +166,8 @@ module.exports = function (app) { const walletPublicKeys = req.body.wallet // array const creatorNodeEndpoint = req.body.creator_node_endpoint // string const immediate = (req.body.immediate === true || req.body.immediate === 'true') + // option to sync just the db records as opposed to db records and files on disk, defaults to false + const dbOnlySync = (req.body.db_only_sync === true || req.body.db_only_sync === 'true') if (!immediate) { req.logger.info('debounce time', config.get('debounceTime')) @@ -117,13 +178,28 @@ module.exports = function (app) { req.logger.info('clear timeout for', wallet, 'time', Date.now()) } syncQueue[wallet] = setTimeout( - async () => _nodesync(req, [wallet], creatorNodeEndpoint), + async () => _nodesync(req, [wallet], creatorNodeEndpoint, dbOnlySync), config.get('debounceTime') ) req.logger.info('set timeout for', wallet, 'time', Date.now()) } } else { - await _nodesync(req, walletPublicKeys, creatorNodeEndpoint) + await _nodesync(req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync) + } + return successResponse() + })) + + // copy the code as the regular sync, just to make sure it's isolated and not called by any other cnode code + // force immediate and dbOnlySync to be true + app.post('/vector_clock_sync', handleResponse(async (req, res) => { + const walletPublicKeys = req.body.wallet // array + const creatorNodeEndpoint = req.body.creator_node_endpoint // string + // option to sync just the db records as opposed to db records and files on disk, defaults to false + const dbOnlySync = true + + let errorObj = await _nodesync(req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync) + if (errorObj) { + return errorResponseServerError(errorObj.message) } return successResponse() })) @@ -139,14 +215,16 @@ module.exports = function (app) { // Get & return latestBlockNumber for wallet const cnodeUser = await models.CNodeUser.findOne({ where: { walletPublicKey } }) - const latestBlockNumber = cnodeUser ? cnodeUser.latestBlockNumber : -1 + const latestBlockNumber = (cnodeUser) ? cnodeUser.latestBlockNumber : -1 + const clockValue = (cnodeUser) ? cnodeUser.clock : -1 - return successResponse({ walletPublicKey, latestBlockNumber }) + return successResponse({ walletPublicKey, latestBlockNumber, clockValue }) })) } -async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { +async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint, dbOnlySync) { const start = Date.now() + let errorObj = null // object to track if the function errored, returned at the end of the function req.logger.info('begin nodesync for ', walletPublicKeys, 'time', start) // ensure access to each wallet, then acquire it for sync. @@ -169,7 +247,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { method: 'get', baseURL: creatorNodeEndpoint, url: '/export', - params: { wallet_public_key: walletPublicKeys }, + params: { wallet_public_key: walletPublicKeys, db_only_sync: dbOnlySync }, responseType: 'json' }) if (resp.status !== 200) throw new Error(resp.data['error']) @@ -183,9 +261,11 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { throw new Error(`Malformed response from ${creatorNodeEndpoint}.`) } - // Attempt to connect directly to target CNode's IPFS node. - await _initBootstrapAndRefreshPeers(req, resp.data.ipfsIDObj.addresses, redisKey) - req.logger.info(redisKey, 'IPFS Nodes connected + data export received') + if (!dbOnlySync) { + // Attempt to connect directly to target CNode's IPFS node. + await _initBootstrapAndRefreshPeers(req, resp.data.ipfsIDObj.addresses, redisKey) + req.logger.info(redisKey, 'IPFS Nodes connected + data export received') + } // For each CNodeUser, replace local DB state with retrieved data + fetch + save missing files. for (const fetchedCNodeUser of Object.values(resp.data.cnodeUsers)) { @@ -196,20 +276,23 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { } const fetchedWalletPublicKey = fetchedCNodeUser.walletPublicKey let userReplicaSet = [] - try { - const myCnodeEndpoint = await middlewares.getOwnEndpoint(req) - userReplicaSet = await middlewares.getCreatorNodeEndpoints(req, fetchedWalletPublicKey) - // push user metadata node to user's replica set if defined - if (config.get('userMetadataNodeUrl')) userReplicaSet.push(config.get('userMetadataNodeUrl')) + if (!dbOnlySync) { + try { + const myCnodeEndpoint = await middlewares.getOwnEndpoint(req) + userReplicaSet = await middlewares.getCreatorNodeEndpoints(req, fetchedWalletPublicKey) - // filter out current node from user's replica set - userReplicaSet = userReplicaSet.filter(url => url !== myCnodeEndpoint) + // push user metadata node to user's replica set if defined + if (config.get('userMetadataNodeUrl')) userReplicaSet.push(config.get('userMetadataNodeUrl')) - // Spread + set uniq's the array - userReplicaSet = [...new Set(userReplicaSet)] - } catch (e) { - req.logger.error(`Couldn't get user's replica sets, can't use cnode gateways in saveFileForMultihash`) + // filter out current node from user's replica set + userReplicaSet = userReplicaSet.filter(url => url !== myCnodeEndpoint) + + // Spread + set uniq's the array + userReplicaSet = [...new Set(userReplicaSet)] + } catch (e) { + req.logger.error(`Couldn't get user's replica sets, can't use cnode gateways in saveFileForMultihash`) + } } if (!walletPublicKeys.includes(fetchedWalletPublicKey)) { @@ -217,54 +300,77 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { } const fetchedCnodeUserUUID = fetchedCNodeUser.cnodeUserUUID - const t = await models.sequelize.transaction() + const transaction = await models.sequelize.transaction() try { const cnodeUser = await models.CNodeUser.findOne({ where: { walletPublicKey: fetchedWalletPublicKey }, - transaction: t + transaction }) const fetchedLatestBlockNumber = fetchedCNodeUser.latestBlockNumber // Delete any previously stored data for cnodeUser in reverse table dependency order (cannot be parallelized). if (cnodeUser) { // Ensure imported data has higher blocknumber than already stored. + // TODO - replace this check with a clock check (!!!) const latestBlockNumber = cnodeUser.latestBlockNumber - if ((fetchedLatestBlockNumber === -1 && latestBlockNumber !== -1) || - (fetchedLatestBlockNumber !== -1 && fetchedLatestBlockNumber <= latestBlockNumber) - ) { - throw new Error(`Imported data is outdated, will not sync. Imported latestBlockNumber \ - ${fetchedLatestBlockNumber} Self latestBlockNumber ${latestBlockNumber}`) + + if (!dbOnlySync) { + if ((fetchedLatestBlockNumber === -1 && latestBlockNumber !== -1) || + (fetchedLatestBlockNumber !== -1 && fetchedLatestBlockNumber <= latestBlockNumber) + ) { + throw new Error(`Imported data is outdated, will not sync. Imported latestBlockNumber \ + ${fetchedLatestBlockNumber} Self latestBlockNumber ${latestBlockNumber}`) + } } const cnodeUserUUID = cnodeUser.cnodeUserUUID req.logger.info(redisKey, `beginning delete ops for cnodeUserUUID ${cnodeUserUUID}`) const numAudiusUsersDeleted = await models.AudiusUser.destroy({ - where: { cnodeUserUUID: cnodeUserUUID }, - transaction: t + where: { cnodeUserUUID }, + transaction }) req.logger.info(redisKey, `numAudiusUsersDeleted ${numAudiusUsersDeleted}`) + // TrackFiles must be deleted before associated Tracks can be deleted. const numTrackFilesDeleted = await models.File.destroy({ where: { - cnodeUserUUID: cnodeUserUUID, - trackUUID: { [models.Sequelize.Op.ne]: null } // Op.ne = notequal + cnodeUserUUID, + trackBlockchainId: { [models.Sequelize.Op.ne]: null } // Op.ne = notequal }, - transaction: t + transaction }) req.logger.info(redisKey, `numTrackFilesDeleted ${numTrackFilesDeleted}`) + const numTracksDeleted = await models.Track.destroy({ - where: { cnodeUserUUID: cnodeUserUUID }, - transaction: t + where: { cnodeUserUUID }, + transaction }) req.logger.info(redisKey, `numTracksDeleted ${numTracksDeleted}`) + // Delete all remaining files (image / metadata files). const numNonTrackFilesDeleted = await models.File.destroy({ - where: { cnodeUserUUID: cnodeUserUUID }, - transaction: t + where: { cnodeUserUUID }, + transaction }) req.logger.info(redisKey, `numNonTrackFilesDeleted ${numNonTrackFilesDeleted}`) + + const numClockRecordsDeleted = await models.ClockRecord.destroy({ + where: { cnodeUserUUID }, + transaction + }) + req.logger.info(redisKey, `numClockRecordsDeleted ${numClockRecordsDeleted}`) + + const numSessionTokensDeleted = await models.SessionToken.destroy({ + where: { cnodeUserUUID }, + transaction + }) + req.logger.info(redisKey, `numSessionTokensDeleted ${numSessionTokensDeleted}`) + + // Delete cnodeUser entry + await cnodeUser.destroy({ transaction }) + req.logger.info(redisKey, `deleted cnodeUserEntry`) } /* Populate all new data for fetched cnodeUser. */ @@ -272,109 +378,100 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { req.logger.info(redisKey, `beginning add ops for cnodeUserUUID ${fetchedCnodeUserUUID}`) // Upsert cnodeUser row. - await models.CNodeUser.upsert({ + await models.CNodeUser.create({ cnodeUserUUID: fetchedCnodeUserUUID, walletPublicKey: fetchedWalletPublicKey, latestBlockNumber: fetchedLatestBlockNumber, - lastLogin: fetchedCNodeUser.lastLogin - }, { transaction: t }) - req.logger.info(redisKey, `upserted nodeUser for cnodeUserUUID ${fetchedCnodeUserUUID}`) - - // Make list of all track Files to add after track creation. + lastLogin: fetchedCNodeUser.lastLogin, + clock: fetchedCNodeUser.clock + }, { transaction }) + req.logger.info(redisKey, `Inserted nodeUser for cnodeUserUUID ${fetchedCnodeUserUUID}`) + + // Save all clockRecords to DB + await models.ClockRecord.bulkCreate(fetchedCNodeUser.clockRecords.map(clockRecord => ({ + ...clockRecord, + cnodeUserUUID: fetchedCnodeUserUUID + })), { transaction }) + req.logger.info(redisKey, 'Recorded all ClockRecord entries in DB') + + /* + * Make list of all track Files to add after track creation + * + * Files with trackBlockchainIds cannot be created until tracks have been created, + * but tracks cannot be created until metadata and cover art files have been created. + */ - // Files with trackUUIDs cannot be created until tracks have been created, - // but tracks cannot be created until metadata and cover art files have been created. const trackFiles = fetchedCNodeUser.files.filter(file => models.File.TrackTypes.includes(file.type)) const nonTrackFiles = fetchedCNodeUser.files.filter(file => models.File.NonTrackTypes.includes(file.type)) - // Save all track files to disk in batches (to limit concurrent load) - for (let i = 0; i < trackFiles.length; i += TrackSaveConcurrencyLimit) { - const trackFilesSlice = trackFiles.slice(i, i + TrackSaveConcurrencyLimit) - req.logger.info(`TrackFiles saveFileForMultihash - processing trackFiles ${i} to ${i + TrackSaveConcurrencyLimit}...`) - await Promise.all(trackFilesSlice.map( - trackFile => saveFileForMultihash(req, trackFile.multihash, trackFile.storagePath, userReplicaSet) - )) - } - - req.logger.info('Saved all track files to disk.') - - // Save all non-track files to disk in batches (to limit concurrent load) - for (let i = 0; i < nonTrackFiles.length; i += NonTrackFileSaveConcurrencyLimit) { - const nonTrackFilesSlice = nonTrackFiles.slice(i, i + NonTrackFileSaveConcurrencyLimit) - req.logger.info(`NonTrackFiles saveFileForMultihash - processing files ${i} to ${i + NonTrackFileSaveConcurrencyLimit}...`) - await Promise.all(nonTrackFilesSlice.map( - nonTrackFile => { - // Skip over directories since there's no actual content to sync - // The files inside the directory are synced separately - if (nonTrackFile.type !== 'dir') { - // if it's an image file, we need to pass in the actual filename because the gateway request is /ipfs/Qm123/ - // need to also check fileName is not null to make sure it's a dir-style image. non-dir images won't have a 'fileName' db column - if (nonTrackFile.type === 'image' && nonTrackFile.fileName !== null) { - return saveFileForMultihash(req, nonTrackFile.multihash, nonTrackFile.storagePath, userReplicaSet, nonTrackFile.fileName) - } else { - return saveFileForMultihash(req, nonTrackFile.multihash, nonTrackFile.storagePath, userReplicaSet) + // if not just db records sync, sync everything + if (!dbOnlySync) { + // Save all track files to disk in batches (to limit concurrent load) + for (let i = 0; i < trackFiles.length; i += TrackSaveConcurrencyLimit) { + const trackFilesSlice = trackFiles.slice(i, i + TrackSaveConcurrencyLimit) + req.logger.info(`TrackFiles saveFileForMultihash - processing trackFiles ${i} to ${i + TrackSaveConcurrencyLimit}...`) + await Promise.all(trackFilesSlice.map( + trackFile => saveFileForMultihash(req, trackFile.multihash, trackFile.storagePath, userReplicaSet) + )) + } + req.logger.info(redisKey, 'Saved all track files to disk.') + + // Save all non-track files to disk in batches (to limit concurrent load) + for (let i = 0; i < nonTrackFiles.length; i += NonTrackFileSaveConcurrencyLimit) { + const nonTrackFilesSlice = nonTrackFiles.slice(i, i + NonTrackFileSaveConcurrencyLimit) + req.logger.info(`NonTrackFiles saveFileForMultihash - processing files ${i} to ${i + NonTrackFileSaveConcurrencyLimit}...`) + await Promise.all(nonTrackFilesSlice.map( + nonTrackFile => { + // Skip over directories since there's no actual content to sync + // The files inside the directory are synced separately + if (nonTrackFile.type !== 'dir') { + // if it's an image file, we need to pass in the actual filename because the gateway request is /ipfs/Qm123/ + // need to also check fileName is not null to make sure it's a dir-style image. non-dir images won't have a 'fileName' db column + if (nonTrackFile.type === 'image' && nonTrackFile.fileName !== null) { + return saveFileForMultihash(req, nonTrackFile.multihash, nonTrackFile.storagePath, userReplicaSet, nonTrackFile.fileName) + } else { + return saveFileForMultihash(req, nonTrackFile.multihash, nonTrackFile.storagePath, userReplicaSet) + } } } - } - )) + )) + } + req.logger.info('Saved all non-track files to disk.') } - req.logger.info('Saved all non-track files to disk.') await models.File.bulkCreate(nonTrackFiles.map(file => ({ - fileUUID: file.fileUUID, - trackUUID: null, - cnodeUserUUID: fetchedCnodeUserUUID, - multihash: file.multihash, - sourceFile: file.sourceFile, - storagePath: file.storagePath, - type: file.type, - fileName: file.fileName, - dirMultihash: file.dirMultihash - })), { transaction: t }) + ...file, + trackBlockchainId: null, + cnodeUserUUID: fetchedCnodeUserUUID + })), { transaction }) req.logger.info(redisKey, 'created all non-track files') await models.Track.bulkCreate(fetchedCNodeUser.tracks.map(track => ({ - trackUUID: track.trackUUID, - blockchainId: track.blockchainId, - cnodeUserUUID: fetchedCnodeUserUUID, - metadataJSON: track.metadataJSON, - metadataFileUUID: track.metadataFileUUID, - coverArtFileUUID: track.coverArtFileUUID - })), { transaction: t }) + ...track, + cnodeUserUUID: fetchedCnodeUserUUID + })), { transaction }) req.logger.info(redisKey, 'created all tracks') // Save all track files to db await models.File.bulkCreate(trackFiles.map(trackFile => ({ - fileUUID: trackFile.fileUUID, - trackUUID: trackFile.trackUUID, - cnodeUserUUID: fetchedCnodeUserUUID, - multihash: trackFile.multihash, - sourceFile: trackFile.sourceFile, - storagePath: trackFile.storagePath, - type: trackFile.type, - fileName: trackFile.fileName, - dirMultihash: trackFile.dirMultihash - })), { transaction: t }) + ...trackFile, + cnodeUserUUID: fetchedCnodeUserUUID + })), { transaction }) req.logger.info('saved all track files to db') await models.AudiusUser.bulkCreate(fetchedCNodeUser.audiusUsers.map(audiusUser => ({ - audiusUserUUID: audiusUser.audiusUserUUID, - cnodeUserUUID: fetchedCnodeUserUUID, - blockchainId: audiusUser.blockchainId, - metadataJSON: audiusUser.metadataJSON, - metadataFileUUID: audiusUser.metadataFileUUID, - coverArtFileUUID: audiusUser.coverArtFileUUID, - profilePicFileUUID: audiusUser.profilePicFileUUID - })), { transaction: t }) + ...audiusUser, + cnodeUserUUID: fetchedCnodeUserUUID + })), { transaction }) req.logger.info('saved all audiususer data to db') - await t.commit() + await transaction.commit() req.logger.info(redisKey, `Transaction successfully committed for cnodeUserUUID ${fetchedCnodeUserUUID}`) redisKey = redisClient.getNodeSyncRedisKey(fetchedWalletPublicKey) await redisLock.removeLock(redisKey) } catch (e) { req.logger.error(redisKey, `Transaction failed for cnodeUserUUID ${fetchedCnodeUserUUID}`, e) - await t.rollback() + await transaction.rollback() redisKey = redisClient.getNodeSyncRedisKey(fetchedWalletPublicKey) await redisLock.removeLock(redisKey) throw new Error(e) @@ -382,6 +479,7 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { } } catch (e) { req.logger.error('Sync Error', e) + errorObj = e } finally { // Release all redis locks for (let wallet of walletPublicKeys) { @@ -391,6 +489,8 @@ async function _nodesync (req, walletPublicKeys, creatorNodeEndpoint) { } req.logger.info(`DURATION SYNC ${Date.now() - start}`) } + + return errorObj } /** Given IPFS node peer addresses, add to bootstrap peers list and manually connect. */ diff --git a/creator-node/src/routes/tracks.js b/creator-node/src/routes/tracks.js index 4b87bee2c5c..41951e15a0f 100644 --- a/creator-node/src/routes/tracks.js +++ b/creator-node/src/routes/tracks.js @@ -5,7 +5,7 @@ const { Buffer } = require('ipfs-http-client') const config = require('../config.js') const { getSegmentsDuration } = require('../segmentDuration') const models = require('../models') -const { saveFileFromBuffer, saveFileToIPFSFromFS, removeTrackFolder, handleTrackContentUpload } = require('../fileManager') +const { saveFileFromBufferToIPFSAndDisk, saveFileToIPFSFromFS, removeTrackFolder, handleTrackContentUpload } = require('../fileManager') const { handleResponse, successResponse, errorResponseBadRequest, errorResponseServerError, errorResponseForbidden } = require('../apiHelpers') const { getFileUUIDForImageCID } = require('../utils') const { authMiddleware, ensurePrimaryMiddleware, syncLockMiddleware, triggerSecondarySyncs } = require('../middlewares') @@ -13,12 +13,11 @@ const TranscodingQueue = require('../TranscodingQueue') const { getCID } = require('./files') const { decode } = require('../hashids.js') const RehydrateIpfsQueue = require('../RehydrateIpfsQueue') +const DBManager = require('../dbManager') module.exports = function (app) { /** * upload track segment files and make avail - will later be associated with Audius track - * @dev - currently stores each segment twice, once under random file UUID & once under IPFS multihash - * - this should be addressed eventually * @dev - Prune upload artifacts after successful and failed uploads. Make call without awaiting, and let async queue clean up. */ app.post('/track_content', authMiddleware, ensurePrimaryMiddleware, syncLockMiddleware, handleTrackContentUpload, handleResponse(async (req, res) => { @@ -34,13 +33,17 @@ module.exports = function (app) { return errorResponseBadRequest(req.fileFilterError) } + const routeTimeStart = Date.now() - let codeBlockTimeStart = Date.now() + let codeBlockTimeStart + const cnodeUserUUID = req.session.cnodeUserUUID - // create and save track file transcoded version and segments to disk + // Create track transcode and segments, and save all to disk let transcodedFilePath let segmentFilePaths try { + codeBlockTimeStart = Date.now() + const transcode = await Promise.all([ TranscodingQueue.segment(req.fileDir, req.fileName, { logContext: req.logContext }), TranscodingQueue.transcode320(req.fileDir, req.fileName, { logContext: req.logContext }) @@ -56,57 +59,27 @@ module.exports = function (app) { return errorResponseServerError(err) } - // for each path, call saveFile and get back multihash; return multihash + segment duration - // run all async ops in parallel as they are independent + // Save transcode and segment files (in parallel) to ipfs and retrieve multihashes codeBlockTimeStart = Date.now() - const t = await models.sequelize.transaction() - - let transcodedFilePromResp - let segmentSaveFilePromResps - let segmentDurations - try { - req.logger.info(`segmentFilePaths.length ${segmentFilePaths.length}`) - let counter = 1 - segmentSaveFilePromResps = await Promise.all(segmentFilePaths.map(async filePath => { - const absolutePath = path.join(req.fileDir, 'segments', filePath) - req.logger.info(`about to perform saveFileToIPFSFromFS #${counter++}`) - let response = await saveFileToIPFSFromFS(req, absolutePath, 'track', req.fileName, t) - response.segmentName = filePath - return response - })) - transcodedFilePromResp = await saveFileToIPFSFromFS(req, transcodedFilePath, 'copy320', req.fileName) - req.logger.info(`Time taken in /track_content for saving segments and transcoding to IPFS: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) - - codeBlockTimeStart = Date.now() - let fileSegmentPath = path.join(req.fileDir, 'segments') - segmentDurations = await getSegmentsDuration( - req, - fileSegmentPath, - req.fileName, - req.file.destination - ) - req.logger.info(`Time taken in /track_content to get segment duration: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) - - // Commit transaction - codeBlockTimeStart = Date.now() - req.logger.info(`attempting to commit tx for file ${req.fileName}`) - await t.commit() - } catch (e) { - req.logger.info(`failed to commit...rolling back. file ${req.fileName}`) - - await t.rollback() - - // Prune upload artifacts - removeTrackFolder(req, req.fileDir) - - return errorResponseServerError(e) - } - req.logger.info(`Time taken in /track_content to commit tx block to db: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) - - let trackSegments = segmentSaveFilePromResps.map((saveFileResp, i) => { - let segmentName = saveFileResp.segmentName - let duration = segmentDurations[segmentName] - return { 'multihash': saveFileResp.multihash, 'duration': duration } + const transcodeFileIPFSResp = await saveFileToIPFSFromFS(req, transcodedFilePath) + const segmentFileIPFSResps = await Promise.all(segmentFilePaths.map(async (segmentFilePath) => { + const segmentAbsolutePath = path.join(req.fileDir, 'segments', segmentFilePath) + const { multihash, dstPath } = await saveFileToIPFSFromFS(req, segmentAbsolutePath) + return { multihash, srcPath: segmentFilePath, dstPath } + })) + req.logger.info(`Time taken in /track_content for saving transcode + segment files to IPFS: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) + + // Retrieve all segment durations as map(segment srcFilePath => segment duration) + codeBlockTimeStart = Date.now() + const segmentDurations = await getSegmentsDuration(req.fileName, req.file.destination) + req.logger.info(`Time taken in /track_content to get segment duration: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) + + // For all segments, build array of (segment multihash, segment duration) + let trackSegments = segmentFileIPFSResps.map((segmentFileIPFSResp) => { + return { + multihash: segmentFileIPFSResp.multihash, + duration: segmentDurations[segmentFileIPFSResp.srcPath] + } }) // exclude 0-length segments that are sometimes outputted by ffmpeg segmentation @@ -120,7 +93,7 @@ module.exports = function (app) { return errorResponseServerError('Track upload failed - no track segments') } - // Don't allow if any segment CID is in blacklist. + // Error if any segment CID is in blacklist. try { await Promise.all(trackSegments.map(async segmentObj => { if (await req.app.get('blacklistManager').CIDIsInBlacklist(segmentObj.multihash)) { @@ -138,13 +111,51 @@ module.exports = function (app) { } } - // Prune upload artifacts + // Record entries for transcode and segment files in DB + codeBlockTimeStart = Date.now() + const transaction = await models.sequelize.transaction() + let transcodeFileUUID + try { + // Record transcode file entry in DB + const createTranscodeFileQueryObj = { + multihash: transcodeFileIPFSResp.multihash, + sourceFile: req.fileName, + storagePath: transcodeFileIPFSResp.dstPath, + type: 'copy320' // TODO - replace with models enum + } + const file = await DBManager.createNewDataRecord(createTranscodeFileQueryObj, cnodeUserUUID, models.File, transaction) + transcodeFileUUID = file.fileUUID + + // Record all segment file entries in DB + // Must be written sequentially to ensure clock values are correctly incremented and populated + for (const { multihash, dstPath } of segmentFileIPFSResps) { + const createSegmentFileQueryObj = { + multihash, + sourceFile: req.fileName, + storagePath: dstPath, + type: 'track' // TODO - replace with models enum + } + await DBManager.createNewDataRecord(createSegmentFileQueryObj, cnodeUserUUID, models.File, transaction) + } + + await transaction.commit() + } catch (e) { + await transaction.rollback() + + // Prune upload artifacts + removeTrackFolder(req, req.fileDir) + + return errorResponseServerError(e) + } + req.logger.info(`Time taken in /track_content for DB updates: ${Date.now() - codeBlockTimeStart}ms for file ${req.fileName}`) + + // Prune upload artifacts after success removeTrackFolder(req, req.fileDir) req.logger.info(`Time taken in /track_content for full route: ${Date.now() - routeTimeStart}ms for file ${req.fileName}`) return successResponse({ - 'transcodedTrackCID': transcodedFilePromResp.multihash, - 'transcodedTrackUUID': transcodedFilePromResp.fileUUID, + 'transcodedTrackCID': transcodeFileIPFSResp.multihash, + 'transcodedTrackUUID': transcodeFileUUID, 'track_segments': trackSegments, 'source_file': req.fileName }) @@ -157,11 +168,13 @@ module.exports = function (app) { app.post('/tracks/metadata', authMiddleware, ensurePrimaryMiddleware, syncLockMiddleware, handleResponse(async (req, res) => { const metadataJSON = req.body.metadata - if (!metadataJSON || - !metadataJSON.owner_id || - !metadataJSON.track_segments || - !Array.isArray(metadataJSON.track_segments) || - !metadataJSON.track_segments.length) { + if ( + !metadataJSON || + !metadataJSON.owner_id || + !metadataJSON.track_segments || + !Array.isArray(metadataJSON.track_segments) || + !metadataJSON.track_segments.length + ) { return errorResponseBadRequest('Metadata object must include owner_id and non-empty track_segments array') } @@ -187,11 +200,12 @@ module.exports = function (app) { // See if the track already has a transcoded master if (trackId) { - const { trackUUID } = await models.Track.findOne({ - attributes: ['trackUUID'], + const { blockchainId } = await models.Track.findOne({ + attributes: ['blockchainId'], where: { blockchainId: trackId - } + }, + order: [['clock', 'DESC']] }) // Error if no DB entry for transcode found @@ -200,25 +214,45 @@ module.exports = function (app) { where: { cnodeUserUUID: req.session.cnodeUserUUID, type: 'copy320', - trackUUID + trackBlockchainId: blockchainId } }) if (!transcodedFile) { - return errorResponseServerError('Failed to find transcoded file ') + return errorResponseServerError('Failed to find transcoded file') } } } - // Store + pin metadata multihash to disk + IPFS. const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON)) + const cnodeUserUUID = req.session.cnodeUserUUID + + // Save file from buffer to IPFS and disk + let multihash, dstPath + try { + const resp = await saveFileFromBufferToIPFSAndDisk(req, metadataBuffer) + multihash = resp.multihash + dstPath = resp.dstPath + } catch (e) { + return errorResponseServerError(`/tracks/metadata saveFileFromBufferToIPFSAndDisk op failed: ${e}`) + } - let multihash, fileUUID + // Record metadata file entry in DB + const transaction = await models.sequelize.transaction() + let fileUUID try { - const saveFileFromBufferResp = await saveFileFromBuffer(req, metadataBuffer, 'metadata') - multihash = saveFileFromBufferResp.multihash - fileUUID = saveFileFromBufferResp.fileUUID + const createFileQueryObj = { + multihash, + sourceFile: req.fileName, + storagePath: dstPath, + type: 'metadata' // TODO - replace with models enum + } + const file = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, models.File, transaction) + fileUUID = file.fileUUID + + await transaction.commit() } catch (e) { - return errorResponseServerError(`Could not save file to disk, ipfs, and/or db: ${e}`) + await transaction.rollback() + return errorResponseServerError(`Could not save to db db: ${e}`) } return successResponse({ @@ -234,36 +268,39 @@ module.exports = function (app) { app.post('/tracks', authMiddleware, ensurePrimaryMiddleware, syncLockMiddleware, handleResponse(async (req, res) => { const { blockchainTrackId, blockNumber, metadataFileUUID, transcodedTrackUUID } = req.body + // Input validation if (!blockchainTrackId || !blockNumber || !metadataFileUUID) { return errorResponseBadRequest('Must include blockchainTrackId, blockNumber, and metadataFileUUID.') } - // Error on outdated blocknumber. + // Error on outdated blocknumber const cnodeUser = req.session.cnodeUser if (!cnodeUser.latestBlockNumber || cnodeUser.latestBlockNumber > blockNumber) { return errorResponseBadRequest(`Invalid blockNumber param. Must be higher than previously processed blocknumber.`) } const cnodeUserUUID = req.session.cnodeUserUUID - // Fetch metadataJSON for metadataFileUUID. + // Fetch metadataJSON for metadataFileUUID, error if not found or malformatted const file = await models.File.findOne({ where: { fileUUID: metadataFileUUID, cnodeUserUUID } }) if (!file) { - return errorResponseBadRequest(`No file found for provided metadataFileUUID ${metadataFileUUID}.`) + return errorResponseBadRequest(`No file db record found for provided metadataFileUUID ${metadataFileUUID}.`) } let metadataJSON try { metadataJSON = JSON.parse(fs.readFileSync(file.storagePath)) - if (!metadataJSON || - !metadataJSON.track_segments || - !Array.isArray(metadataJSON.track_segments) || - !metadataJSON.track_segments.length) { + if ( + !metadataJSON || + !metadataJSON.track_segments || + !Array.isArray(metadataJSON.track_segments) || + !metadataJSON.track_segments.length + ) { return errorResponseServerError(`Malformatted metadataJSON stored for metadataFileUUID ${metadataFileUUID}.`) } } catch (e) { return errorResponseServerError(`No file stored on disk for metadataFileUUID ${metadataFileUUID} at storagePath ${file.storagePath}.`) } - // Get coverArtFileUUID for multihash in metadata object, if present. + // Get coverArtFileUUID for multihash in metadata object, else error let coverArtFileUUID try { coverArtFileUUID = await getFileUUIDForImageCID(req, metadataJSON.cover_art_sizes) @@ -271,51 +308,61 @@ module.exports = function (app) { return errorResponseServerError(e.message) } - const t = await models.sequelize.transaction() - + const transaction = await models.sequelize.transaction() try { - // Create / update track entry on db. - const resp = (await models.Track.upsert({ - cnodeUserUUID, + const existingTrackEntry = await models.Track.findOne({ + where: { + cnodeUserUUID, + blockchainId: blockchainTrackId, + coverArtFileUUID + }, + order: [['clock', 'DESC']], + transaction + }) + + // Insert track entry in DB + const createTrackQueryObj = { metadataFileUUID, metadataJSON, blockchainId: blockchainTrackId, coverArtFileUUID - }, - { transaction: t, returning: true } - )) - const track = resp[0] - const trackCreated = resp[1] + } + const track = await DBManager.createNewDataRecord(createTrackQueryObj, cnodeUserUUID, models.Track, transaction) - /** Associate matching segment files on DB with new/updated track. */ + /** + * Associate matching transcode & segment files on DB with new/updated track + * Must be done in same transaction to atomicity + * + * TODO - consider implications of edge-case -> two attempted /track_content before associate + */ const trackSegmentCIDs = metadataJSON.track_segments.map(segment => segment.multihash) - // if track created, ensure files exist with trackuuid = null and update them. - if (trackCreated) { - // Update the transcoded 320kbps copy + // if track created, ensure files exist with trackBlockchainId = null and update them + if (!existingTrackEntry) { + // Associate the transcode file db record with trackUUID if (transcodedTrackUUID) { const transcodedFile = await models.File.findOne({ where: { fileUUID: transcodedTrackUUID, cnodeUserUUID, - trackUUID: null, + trackBlockchainId: null, type: 'copy320' }, - transaction: t + transaction }) if (!transcodedFile) { throw new Error('Did not find a transcoded file for the provided CID.') } const numAffectedRows = await models.File.update( - { trackUUID: track.trackUUID }, + { trackBlockchainId: track.blockchainId }, { where: { fileUUID: transcodedTrackUUID, cnodeUserUUID, - trackUUID: null, - type: 'copy320' + trackBlockchainId: null, + type: 'copy320' // TODO - replace with model enum }, - transaction: t + transaction } ) if (numAffectedRows === 0) { @@ -323,68 +370,70 @@ module.exports = function (app) { } } - // Update the corresponding segment files + // Associate all segment file db records with trackUUID const trackFiles = await models.File.findAll({ where: { multihash: trackSegmentCIDs, cnodeUserUUID, - trackUUID: null, + trackBlockchainId: null, type: 'track' }, - transaction: t + transaction }) - if (trackFiles.length < trackSegmentCIDs.length) { + + if (trackFiles.length !== trackSegmentCIDs.length) { throw new Error('Did not find files for every track segment CID.') } const numAffectedRows = await models.File.update( - { trackUUID: track.trackUUID }, - { where: { - multihash: trackSegmentCIDs, - cnodeUserUUID, - trackUUID: null, - type: 'track' - }, - transaction: t + { trackBlockchainId: track.blockchainId }, + { + where: { + multihash: trackSegmentCIDs, + cnodeUserUUID, + trackBlockchainId: null, + type: 'track' + }, + transaction } ) - if (numAffectedRows < trackSegmentCIDs.length) { + if (parseInt(numAffectedRows, 10) !== trackSegmentCIDs.length) { throw new Error('Failed to associate files for every track segment CID.') } - } else { /** If track updated, ensure files exist with trackuuid. */ - // Check the transcoded copy if present + } else { /** If track updated, ensure files exist with trackBlockchainId. */ + // Ensure transcode file db record exists if uuid provided if (transcodedTrackUUID) { const transcodedFile = await models.File.findOne({ where: { fileUUID: transcodedTrackUUID, cnodeUserUUID, - trackUUID: track.trackUUID, + trackBlockchainId: track.blockchainId, type: 'copy320' }, - transaction: t + transaction }) if (!transcodedFile) { throw new Error('Did not find the corresponding transcoded file for the provided track UUID.') } } - // Check the segment files + // Ensure segment file db records exist for all CIDs const trackFiles = await models.File.findAll({ where: { multihash: trackSegmentCIDs, cnodeUserUUID, - trackUUID: track.trackUUID, + trackBlockchainId: track.blockchainId, type: 'track' }, - transaction: t + transaction }) if (trackFiles.length < trackSegmentCIDs.length) { - throw new Error('Did not find files for every track segment CID with trackUUID.') + throw new Error('Did not find files for every track segment CID with trackBlockchainId.') } } // Update cnodeUser's latestBlockNumber if higher than previous latestBlockNumber. // TODO - move to subquery to guarantee atomicity. - const updatedCNodeUser = await models.CNodeUser.findOne({ where: { cnodeUserUUID }, transaction: t }) + const updatedCNodeUser = await models.CNodeUser.findOne({ where: { cnodeUserUUID }, transaction }) if (!updatedCNodeUser || !updatedCNodeUser.latestBlockNumber) { throw new Error('Issue in retrieving udpatedCnodeUser') } @@ -394,15 +443,16 @@ module.exports = function (app) { given blockNumber ${blockNumber}` ) if (blockNumber > updatedCNodeUser.latestBlockNumber) { - await cnodeUser.update({ latestBlockNumber: blockNumber }, { transaction: t }) + // Update cnodeUser's latestBlockNumber + await cnodeUser.update({ latestBlockNumber: blockNumber }, { transaction }) } - await t.commit() + await transaction.commit() triggerSecondarySyncs(req) - return successResponse({ trackUUID: track.trackUUID }) + return successResponse() } catch (e) { req.logger.error(e.message) - await t.rollback() + await transaction.rollback() return errorResponseServerError(e.message) } })) @@ -414,7 +464,10 @@ module.exports = function (app) { return errorResponseBadRequest('Please provide blockchainId.') } - const track = await models.Track.findOne({ where: { blockchainId } }) + const track = await models.Track.findOne({ + where: { blockchainId }, + order: [['clock', 'DESC']] + }) if (!track) { return errorResponseBadRequest(`No track found for blockchainId ${blockchainId}`) } @@ -425,11 +478,11 @@ module.exports = function (app) { } // Case: track is marked as downloadable - // - Check if downloadable file exists. Since copyFile may or may not have trackUUID association, - // fetch a segmentFile for trackUUID, and find copyFile for segmentFile's sourceFile. + // - Check if downloadable file exists. Since copyFile may or may not have trackBlockchainId association, + // fetch a segmentFile for trackBlockchainId, and find copyFile for segmentFile's sourceFile. const segmentFile = await models.File.findOne({ where: { type: 'track', - trackUUID: track.trackUUID + trackBlockchainId: track.blockchainId } }) const copyFile = await models.File.findOne({ where: { type: 'copy320', @@ -468,12 +521,13 @@ module.exports = function (app) { return errorResponseBadRequest(`Invalid ID: ${encodedId}`) } - const { trackUUID } = await models.Track.findOne({ - attributes: ['trackUUID'], - where: { blockchainId } + const { blockchainId: blockchainIdFromTrack } = await models.Track.findOne({ + attributes: ['blockchainId'], + where: { blockchainId }, + order: [['clock', 'DESC']] }) - if (!trackUUID) { + if (!blockchainIdFromTrack) { return errorResponseBadRequest(`No track found for blockchainId ${blockchainId}`) } @@ -481,8 +535,9 @@ module.exports = function (app) { attributes: ['multihash'], where: { type: 'copy320', - trackUUID - } + trackBlockchainId: blockchainIdFromTrack + }, + order: [['clock', 'DESC']] }) if (!multihash) { @@ -502,21 +557,28 @@ module.exports = function (app) { return getCID(req, res) })) - /** List all unlisted tracks for a user */ + /** + * List all unlisted tracks for a user + */ app.get('/tracks/unlisted', authMiddleware, handleResponse(async (req, res) => { - const tracks = await models.Track.findAll({ - where: { - metadataJSON: { - is_unlisted: true - }, - cnodeUserUUID: req.session.cnodeUserUUID + const tracks = (await models.sequelize.query( + `select "metadataJSON"->'title' as "title", "blockchainId" from ( + select "metadataJSON", "blockchainId", row_number() over ( + partition by "blockchainId" order by "clock" desc + ) from "Tracks" + where "cnodeUserUUID" = :cnodeUserUUID + and ("metadataJSON"->>'is_unlisted')::boolean = true + ) as a + where a.row_number = 1;`, + { + replacements: { cnodeUserUUID: req.session.cnodeUserUUID } } - }) + ))[0] return successResponse({ - tracks: tracks.map(t => ({ - title: t.metadataJSON.title, - id: t.blockchainId + tracks: tracks.map(track => ({ + title: track.title, + id: track.blockchainId })) }) })) diff --git a/creator-node/src/routes/users.js b/creator-node/src/routes/users.js index 4e0c5f5242a..581631d28b4 100644 --- a/creator-node/src/routes/users.js +++ b/creator-node/src/routes/users.js @@ -15,6 +15,9 @@ const CHALLENGE_TTL_SECONDS = 120 const CHALLENGE_PREFIX = 'userLoginChallenge:' module.exports = function (app) { + /** + * Creates CNodeUser table entry if one doesn't already exist + */ app.post('/users', handleResponse(async (req, res, next) => { let walletAddress = req.body.walletAddress if (!ethereumUtils.isValidAddress(walletAddress)) { @@ -32,7 +35,12 @@ module.exports = function (app) { return successResponse() // do nothing if user already exists } - await models.CNodeUser.create({ walletPublicKey: walletAddress }) + // Create CNodeUser entry for wallet with clock = 0 + await models.CNodeUser.create({ + walletPublicKey: walletAddress, + clock: 0 + }) + return successResponse() })) @@ -145,4 +153,21 @@ module.exports = function (app) { await sessionManager.deleteSession(req.get(sessionManager.sessionTokenHeader)) return successResponse() })) + + /** + * Returns latest clock value stored in CNodeUsers entry given wallet, or -1 if no entry found + */ + app.get('/users/clock_status/:walletPublicKey', handleResponse(async (req, res) => { + let walletPublicKey = req.params.walletPublicKey + + walletPublicKey = walletPublicKey.toLowerCase() + + const cnodeUser = await models.CNodeUser.findOne({ + where: { walletPublicKey } + }) + + const clockValue = (cnodeUser) ? cnodeUser.dataValues.clock : -1 + + return successResponse({ clockValue }) + })) } diff --git a/creator-node/src/routes/vectorClock.js b/creator-node/src/routes/vectorClock.js new file mode 100644 index 00000000000..2ff55948a5c --- /dev/null +++ b/creator-node/src/routes/vectorClock.js @@ -0,0 +1,186 @@ +const models = require('../models') +const { handleResponse, successResponse, errorResponseServerError } = require('../apiHelpers') +const axios = require('axios') + +module.exports = function (app) { + app.post('/vector_clock_backfill/:wallet', handleResponse(async (req, res, next) => { + const walletPublicKey = req.params.wallet + const { primary, secondaries } = req.body + let clock = 0 + + const transaction = await models.sequelize.transaction() + try { + // Fetch cnodeUser for each walletPublicKey. + // lock the CNodeUser table so no other tx can write to it while this is in progress + const cnodeUser = await models.CNodeUser.findOne({ + where: { + walletPublicKey: walletPublicKey + }, + transaction, + lock: transaction.LOCK.UPDATE // this makes the query SELECT ... FOR UPDATE + }) + + // early exit if cnodeUser not found on primary + if (!cnodeUser) { + await transaction.commit() + return successResponse({ status: 'No cnodeUser record found on the primary' }) + } + + // clock values have been added for CNodeUser, check if they're consistent across all nodes before returning success + if (cnodeUser.clock && cnodeUser.clock > 0) { + await transaction.commit() + // first try/catch is to make sure if the secondaries are not synced. if not, we try to sync + try { + await _checkSecondaryClockValues(secondaries, walletPublicKey, cnodeUser.clock) + } catch (e) { + await _triggerSecondarySyncs(req, primary, secondaries, walletPublicKey) + } + + // if we kick off a sync and it still not fixed, return with error + try { + await _checkSecondaryClockValues(secondaries, walletPublicKey, cnodeUser.clock) + } catch (e) { + return errorResponseServerError(e) + } + return successResponse({ status: 'Already ran successfully!' }) + } + + // Fetch all data for cnodeUserUUIDs: audiusUsers, tracks, files. + let [audiusUsers, tracks, files] = await Promise.all([ + models.AudiusUser.findAll({ where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, transaction, raw: true }), + models.Track.findAll({ where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, transaction, raw: true }), + models.File.findAll({ where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, transaction, raw: true }) + ]) + + audiusUsers.forEach(record => { + record.type = 'AudiusUser' + }) + + tracks.forEach(record => { + record.type = 'Track' + }) + // if it doesn't have a type it's a file + + let allRecords = audiusUsers.concat(tracks, files) + // sort in chronological order, oldest first + allRecords.sort((a, b) => new Date(a.createdAt) - new Date(b.createdAt)) + + // reset these values + audiusUsers = [] + tracks = [] + files = [] + let clockRecords = [] + + allRecords.map(record => { + clock += 1 + let clockRecord = { cnodeUserUUID: cnodeUser.cnodeUserUUID, clock, createdAt: record.createdAt } + if (record.type === 'AudiusUser') { + audiusUsers.push({ ...record, clock }) + clockRecord.sourceTable = 'AudiusUser' + } else if (record.type === 'Track') { + tracks.push({ ...record, clock }) + clockRecord.sourceTable = 'Track' + } else { + files.push({ ...record, clock }) + clockRecord.sourceTable = 'File' + } + clockRecords.push(clockRecord) + }) + req.logger.info('final clock value', clock) + + // delete the existing records + await models.AudiusUser.destroy({ + where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, + transaction + }) + + await models.Track.destroy({ + where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, + transaction + }) + + await models.File.destroy({ + where: { cnodeUserUUID: cnodeUser.cnodeUserUUID }, + transaction + }) + + // insert the new records + // chunk files by 10000 records to insert if > 10000 + if (files.length > 10000) { + for (let i = 0; i <= files.length; i += 10000) { + req.logger.info('writing files from idx', i, i + 10000) + await models.File.bulkCreate(files.slice(i, i + 10000), { transaction }) + } + } else { + await models.File.bulkCreate(files, { transaction }) + } + + await models.Track.bulkCreate(tracks, { transaction }) + + await models.AudiusUser.bulkCreate(audiusUsers, { transaction }) + + if (clockRecords.length > 10000) { + for (let i = 0; i <= clockRecords.length; i += 10000) { + req.logger.info('writing clockrecords from idx', i, i + 10000) + await models.ClockRecord.bulkCreate(clockRecords.slice(i, i + 10000), { transaction }) + } + } else { + await models.ClockRecord.bulkCreate(clockRecords, { transaction }) + } + + await cnodeUser.update({ clock }, { transaction }) + + await transaction.commit() + } catch (e) { + console.error(e) + await transaction.rollback() + return errorResponseServerError(e.message) + } + + try { + // trigger secondary syncs here + await _triggerSecondarySyncs(req, primary, secondaries, walletPublicKey) + await _checkSecondaryClockValues(secondaries, walletPublicKey, clock) + } catch (e) { + return errorResponseServerError(e) + } + return successResponse() + })) +} + +async function _checkSecondaryClockValues (secondaries, walletPublicKey, clock) { + if (clock > 25000) clock = 25000 + + const resp = (await Promise.all(secondaries.map(secondary => { + const axiosReq = { + baseURL: secondary, + url: `/sync_status/${walletPublicKey}`, + method: 'get' + } + return axios(axiosReq) + }))).map(r => r.data.data.clockValue) + + resp.map(r => { + if (r !== clock) throw new Error(`Secondaries not in sync with primary [${resp}]`) + }) +} + +async function _triggerSecondarySyncs (req, primary, secondaries, walletPublicKey) { + if (secondaries && secondaries.length > 0) { + await Promise.all(secondaries.map(secondary => { + req.logger.info(`calling sync to secondary for ${secondary} - ${walletPublicKey}`) + const axiosReq = { + baseURL: secondary, + url: '/vector_clock_sync', + method: 'post', + data: { + wallet: [walletPublicKey], + creator_node_endpoint: primary, + immediate: true, + db_only_sync: true + } + } + return axios(axiosReq) + })) + } +} diff --git a/creator-node/src/segmentDuration.js b/creator-node/src/segmentDuration.js index fcb60e8fb47..deb10c16f58 100644 --- a/creator-node/src/segmentDuration.js +++ b/creator-node/src/segmentDuration.js @@ -3,8 +3,8 @@ const fs = require('fs') const SEGMENT_REGEXP = /(segment[0-9]*.ts)/ -// Parse m3u8 file from HLS output and return mapped segment durations -async function getSegmentsDuration (req, segmentPath, filename, filedir) { +// Parse m3u8 file from HLS output and return map(segment filePath (segmentName) => segment duration) +async function getSegmentsDuration (filename, filedir) { return new Promise((resolve, reject) => { try { let splitResults = filename.split('.') diff --git a/creator-node/src/userNodeMiddleware.js b/creator-node/src/userNodeMiddleware.js index 3a2c8b7e9a4..6ab94ba5282 100644 --- a/creator-node/src/userNodeMiddleware.js +++ b/creator-node/src/userNodeMiddleware.js @@ -3,7 +3,7 @@ const config = require('./config') async function userNodeMiddleware (req, res, next) { const isUserMetadataNode = config.get('isUserMetadataNode') - const userNodeRegex = new RegExp(/(users|version|health_check|image_upload|ipfs|export)/gm) + const userNodeRegex = new RegExp(/(users|version|db_check|health_check|image_upload|ipfs|export|vector_clock_backfill)/gm) if (isUserMetadataNode) { const isValidUrl = userNodeRegex.test(req.url) if (!isValidUrl) { diff --git a/creator-node/test/audiusUsers.js b/creator-node/test/audiusUsers.test.js similarity index 82% rename from creator-node/test/audiusUsers.js rename to creator-node/test/audiusUsers.test.js index 68ec5dedcbd..37e39514b1e 100644 --- a/creator-node/test/audiusUsers.js +++ b/creator-node/test/audiusUsers.test.js @@ -5,6 +5,7 @@ const path = require('path') const fs = require('fs') const models = require('../src/models') + const ipfsClient = require('../src/ipfsClient') const config = require('../src/config') const BlacklistManager = require('../src/blacklistManager') @@ -15,7 +16,7 @@ const { getIPFSMock } = require('./lib/ipfsMock') const { getLibsMock } = require('./lib/libsMock') const { sortKeys } = require('../src/apiHelpers') -describe('test AudiusUsers', function () { +describe('test AudiusUsers with mocked IPFS', function () { let app, server, session, ipfsMock, libsMock // Will need a '.' in front of storagePath to look at current dir @@ -45,23 +46,23 @@ describe('test AudiusUsers', function () { await server.close() }) - it('creates Audius user', async function () { + it('successfully creates Audius user (POST /audius_users/metadata)', async function () { const metadata = { test: 'field1' } ipfsMock.add.twice().withArgs(Buffer.from(JSON.stringify(metadata))) ipfsMock.pin.add.once().withArgs('testCIDLink') const resp = await request(app) .post('/audius_users/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(200) - if (resp.body.metadataMultihash !== 'testCIDLink') { + if (resp.body.metadataMultihash !== 'testCIDLink' || !resp.body.metadataFileUUID) { throw new Error('invalid return data') } }) - it('completes Audius user creation', async function () { + it('successfully completes Audius user creation (POST /audius_users/metadata -> POST /audius_users)', async function () { const metadata = { test: 'field1' } ipfsMock.add.twice().withArgs(Buffer.from(JSON.stringify(metadata))) @@ -70,7 +71,7 @@ describe('test AudiusUsers', function () { const resp = await request(app) .post('/audius_users/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(200) @@ -80,7 +81,7 @@ describe('test AudiusUsers', function () { await request(app) .post('/audius_users') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ blockchainUserId: 1, blockNumber: 10, metadataFileUUID: resp.body.metadataFileUUID }) .expect(200) }) @@ -89,7 +90,7 @@ describe('test AudiusUsers', function () { // Below block uses actual ipfsClient (unlike first describe block), hence // another describe block for this purpose // NOTE: these tests mock ipfs client errors; otherwise, for happy path, uses actual ipfsClient -describe('tests /audius_users/metadata metadata upload with actual ipfsClient for happy path', function () { +describe('Test AudiusUsers with real IPFS', function () { let app, server, session, libsMock, ipfs // Will need a '.' in front of storagePath to look at current dir @@ -122,7 +123,7 @@ describe('tests /audius_users/metadata metadata upload with actual ipfsClient fo it('should fail if metadata is not found in request body', async function () { const resp = await request(app) .post('/audius_users/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ dummy: 'data' }) .expect(500) @@ -136,18 +137,18 @@ describe('tests /audius_users/metadata metadata upload with actual ipfsClient fo const metadata = { metadata: 'spaghetti' } const resp = await request(app) .post('/audius_users/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send(metadata) .expect(500) - assert.deepStrictEqual(resp.body.error, 'Could not save file to disk, ipfs, and/or db: Error: ipfs add failed!') + assert.deepStrictEqual(resp.body.error, 'saveFileFromBufferToIPFSAndDisk op failed: Error: ipfs add failed!') }) - it('should successfully add metadata file to filesystem, db, and ipfs', async function () { + it('successfully creates Audius user (POST /audius_users/metadata)', async function () { const metadata = sortKeys({ spaghetti: 'spaghetti' }) const resp = await request(app) .post('/audius_users/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(200) @@ -181,4 +182,12 @@ describe('tests /audius_users/metadata metadata upload with actual ipfsClient fo const metadataBuffer = Buffer.from(JSON.stringify(metadata)) assert.deepStrictEqual(metadataBuffer.compare(ipfsResp), 0) }) + + it('TODO - successfully completes Audius user creation (POST /audius_users/metadata -> POST /audius_users)', async function () { + + }) + + it('TODO - multiple uploads', async function () { + + }) }) diff --git a/creator-node/test/dbManager.test.js b/creator-node/test/dbManager.test.js new file mode 100644 index 00000000000..3f0a6fab172 --- /dev/null +++ b/creator-node/test/dbManager.test.js @@ -0,0 +1,365 @@ +const assert = require('assert') +const proxyquire = require('proxyquire') +const _ = require('lodash') + +const models = require('../src/models') +const DBManager = require('../src/dbManager') +const blacklistManager = require('../src/blacklistManager') +const utils = require('../src/utils') +const { createStarterCNodeUser } = require('./lib/dataSeeds') +const { getApp } = require('./lib/app') +const { getIPFSMock } = require('./lib/ipfsMock') +const { getLibsMock } = require('./lib/libsMock') + +describe('Test createNewDataRecord()', () => { + const req = { + logger: { + error: (msg) => console.log(msg) + } + } + + const getCNodeUser = async (cnodeUserUUID) => { + const cnodeUser = await models.CNodeUser.findOne({ where: { cnodeUserUUID } }) + return cnodeUser.dataValues + } + + const initialClockVal = 0 + const timeoutMs = 1000 + + let cnodeUserUUID, createFileQueryObj, server + + /** Init server to run DB migrations */ + before(async function () { + const appInfo = await getApp(getIPFSMock(), getLibsMock(), blacklistManager) + server = appInfo.server + }) + + /** Reset DB state + Create cnodeUser + confirm initial clock state + define global vars */ + beforeEach(async function () { + // Wipe all CNodeUsers + dependent data + await models.CNodeUser.destroy({ + where: {}, + truncate: true, + cascade: true // cascades delete to all rows with foreign key on cnodeUser + }) + + const resp = await createStarterCNodeUser() + cnodeUserUUID = resp.cnodeUserUUID + req.session = { cnodeUserUUID } + + // Confirm initial clock val in DB + const cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal) + + createFileQueryObj = { + multihash: 'testMultihash', + sourceFile: 'testSourceFile', + storagePath: 'testStoragePath', + type: 'metadata' // TODO - replace with models enum + } + }) + + afterEach(async function () { + models.sequelize.removeHook('beforeCreate', 'clockTimeout') + }) + + /** Wipe all CNodeUsers + dependent data */ + after(async function () { + await models.CNodeUser.destroy({ + where: {}, + truncate: true, + cascade: true // cascades delete to all rows with foreign key on cnodeUser + }) + + await server.close() + }) + + it('Sequential createNewDataRecord - create 2 records', async () => { + const sequelizeTableInstance = models.File + + /** + * CREATE RECORD 1 + */ + + // Create new Data record + let transaction = await models.sequelize.transaction() + let createdFile = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, sequelizeTableInstance, transaction) + await transaction.commit() + + // Validate returned file object + assert.strictEqual(createdFile.cnodeUserUUID, cnodeUserUUID) + assert.strictEqual(createdFile.clock, initialClockVal + 1) + + // Validate CNodeUsers table state + let cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal + 1) + + // Validate ClockRecords table state + let clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID } }) + assert.strictEqual(clockRecords.length, 1) + let clockRecord = clockRecords[0].dataValues + assert.strictEqual(clockRecord.clock, initialClockVal + 1) + assert.strictEqual(clockRecord.sourceTable, sequelizeTableInstance.name) + + // Validate Files table state + let files = await models.File.findAll({ where: { cnodeUserUUID } }) + assert.strictEqual(files.length, 1) + let file = files[0].dataValues + assert.strictEqual(file.clock, initialClockVal + 1) + + /** + * CREATE RECORD 2 + */ + + // Create new Data record + transaction = await models.sequelize.transaction() + createFileQueryObj = { + multihash: 'testMultihash2', + sourceFile: 'testSourceFile2', + storagePath: 'testStoragePath2', + type: 'metadata' // TODO - replace with models enum + } + createdFile = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, sequelizeTableInstance, transaction) + await transaction.commit() + + // Validate returned file object + assert.strictEqual(createdFile.cnodeUserUUID, cnodeUserUUID) + assert.strictEqual(createdFile.clock, initialClockVal + 2) + + // Validate CNodeUsers table state + cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal + 2) + + // Validate ClockRecords table state + clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'DESC']] }) + assert.strictEqual(clockRecords.length, 2) + clockRecord = clockRecords[0].dataValues + assert.strictEqual(clockRecord.sourceTable, sequelizeTableInstance.name) + assert.strictEqual(clockRecord.clock, initialClockVal + 2) + + // Validate Files table state + files = await models.File.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'DESC']] }) + assert.strictEqual(files.length, 2) + file = files[0].dataValues + assert.strictEqual(file.clock, initialClockVal + 2) + }) + + it('Concurrent createNewDataRecord - successfully makes concurrent calls in separate transactions', async () => { + const sequelizeTableInstance = models.File + const numEntries = 5 + + // Add global sequelize hook to add timeout before ClockRecord.create calls to force concurrent ops + models.sequelize.addHook('beforeCreate', 'clockTimeout', async (instance, options) => { + if (instance.constructor.name === 'ClockRecord') { + await utils.timeout(timeoutMs) + } + }) + + // Replace required models instance with modified models instance + proxyquire('../src/dbManager', { './models': models }) + + // Make multiple concurrent calls - create a transaction for each call + const arr = _.range(1, numEntries + 1) // [1, 2, ..., numEntries] + let createdFiles = await Promise.all(arr.map(async () => { + const transaction = await models.sequelize.transaction() + const createdFile = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, sequelizeTableInstance, transaction) + await transaction.commit() + + return createdFile + })) + + // Validate returned file objects + createdFiles = _.orderBy(createdFiles, ['createdAt'], ['asc']) + createdFiles.forEach((createdFile, index) => { + assert.strictEqual(createdFile.cnodeUserUUID, cnodeUserUUID) + assert.strictEqual(createdFile.clock, initialClockVal + 1 + index) + }) + + // Validate CNodeUsers table state + const cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal + numEntries) + + // Validate ClockRecords table state + const clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'ASC']] }) + assert.strictEqual(clockRecords.length, numEntries) + clockRecords.forEach((clockRecord, index) => { + clockRecord = clockRecord.dataValues + assert.strictEqual(clockRecord.sourceTable, sequelizeTableInstance.name) + assert.strictEqual(clockRecord.clock, initialClockVal + 1 + index) + }) + + // Validate Files table state + const files = await models.File.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'ASC']] }) + assert.strictEqual(files.length, numEntries) + files.forEach((file, index) => { + file = file.dataValues + assert.strictEqual(file.clock, initialClockVal + 1 + index) + }) + }) + + it('Concurrent createNewDataRecord - fails to make concurrent calls in a single transaction due to ClockRecords_pkey', async () => { + const sequelizeTableInstance = models.File + const numEntries = 5 + + // Add global sequelize hook to add timeout before ClockRecord.create calls to force concurrent ops + models.sequelize.addHook('beforeCreate', 'clockTimeout', async (instance, options) => { + if (instance.constructor.name === 'ClockRecord') { + await utils.timeout(timeoutMs) + } + }) + + // Replace required models instance with modified models instance + proxyquire('../src/dbManager', { './models': models }) + + // Attempt to make multiple concurrent calls, re-using the same transaction each time + const transaction = await models.sequelize.transaction() + try { + const arr = _.range(1, numEntries + 1) // [1, 2, ..., numEntries] + await Promise.all(arr.map(async () => { + const createdFile = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, sequelizeTableInstance, transaction) + return createdFile + })) + await transaction.commit() + } catch (e) { + await transaction.rollback() + assert.strictEqual(e.name, 'SequelizeUniqueConstraintError') + assert.strictEqual(e.original.message, 'duplicate key value violates unique constraint "ClockRecords_pkey"') + } + + /** + * Confirm none of the rows were written to DB + */ + + // Validate CNodeUsers table state + const cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal) + + // Validate ClockRecords table state + const clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'DESC']] }) + assert.strictEqual(clockRecords.length, 0) + + // Validate Files table state + const files = await models.File.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'DESC']] }) + assert.strictEqual(files.length, 0) + }) + + /** + * Simulates /image_upload and /track_content routes, which write multiple files sequentially in atomic tx + */ + it('Sequential createNewDataRecord - successfully makes multiple sequential calls in single transaction', async () => { + const sequelizeTableInstance = models.File + const numEntries = 5 + + // Make multiple squential calls, re-using the same transaction each time + const transaction = await models.sequelize.transaction() + const arr = _.range(1, numEntries + 1) // [1, 2, ..., numEntries] + const createdFilesResp = [] + // eslint-disable-next-line no-unused-vars + for await (const i of arr) { + const createdFile = await DBManager.createNewDataRecord(createFileQueryObj, cnodeUserUUID, sequelizeTableInstance, transaction) + createdFilesResp.push(createdFile) + } + await transaction.commit() + + // Validate returned file objects + const createdFiles = _.orderBy(createdFilesResp, ['createdAt'], ['asc']) + createdFiles.forEach((createdFile, index) => { + assert.strictEqual(createdFile.cnodeUserUUID, cnodeUserUUID) + assert.strictEqual(createdFile.clock, initialClockVal + 1 + index) + }) + + // Validate CNodeUsers table state + const cnodeUser = await getCNodeUser(cnodeUserUUID) + assert.strictEqual(cnodeUser.clock, initialClockVal + numEntries) + + // Validate ClockRecords table state + const clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'ASC']] }) + assert.strictEqual(clockRecords.length, numEntries) + clockRecords.forEach((clockRecord, index) => { + clockRecord = clockRecord.dataValues + assert.strictEqual(clockRecord.sourceTable, sequelizeTableInstance.name) + assert.strictEqual(clockRecord.clock, initialClockVal + 1 + index) + }) + + // Validate Files table state + const files = await models.File.findAll({ where: { cnodeUserUUID }, order: [['createdAt', 'ASC']] }) + assert.strictEqual(files.length, numEntries) + files.forEach((file, index) => { + file = file.dataValues + assert.strictEqual(file.clock, initialClockVal + 1 + index) + }) + }) + + it('Confirm file.pkey will block duplicate clock vals from being written', async () => { + const transaction = await models.sequelize.transaction() + try { + createFileQueryObj = { + cnodeUserUUID, + multihash: 'testMultihash', + sourceFile: 'testSourceFile', + storagePath: 'testStoragePath', + type: 'metadata', // TODO - replace with models enum + clock: 0 + } + await models.File.create(createFileQueryObj, { transaction }) + await models.File.create(createFileQueryObj, { transaction }) + await transaction.commit() + } catch (e) { + await transaction.rollback() + assert.strictEqual(e.name, 'SequelizeUniqueConstraintError') + assert.strictEqual(e.original.message, 'duplicate key value violates unique constraint "Files_unique_(cnodeUserUUID,clock)"') + } + }) +}) + +describe('Test ClockRecord model', () => { + it('Confirm only valid sourceTable value can be written to ClockRecords table', async () => { + await models.CNodeUser.destroy({ + where: {}, + truncate: true, + cascade: true // cascades delete to all rows with foreign key on cnodeUser + }) + const cnodeUserUUID = (await createStarterCNodeUser()).cnodeUserUUID + + const validSourceTable = 'AudiusUser' + const invalidSourceTable = 'invalidSourceTable' + + // Confirm ClockRecords insert with validSourceTable value will succeed + await models.ClockRecord.create({ + cnodeUserUUID, + clock: 0, + sourceTable: validSourceTable + }) + + // Confirm ClockRecord was created + const clockRecords = await models.ClockRecord.findAll({ where: { cnodeUserUUID } }) + assert.strictEqual(clockRecords.length, 1) + const clockRecord = clockRecords[0] + assert.strictEqual(clockRecord.cnodeUserUUID, cnodeUserUUID) + assert.strictEqual(clockRecord.clock, 0) + assert.strictEqual(clockRecord.sourceTable, validSourceTable) + + // Confirm ClockRecords insert with invalidSourceTable value will fail due to DB error + try { + await models.sequelize.query(` + INSERT INTO "ClockRecords" + ("cnodeUserUUID","clock","sourceTable","createdAt","updatedAt") + VALUES ( + 'f13d776e-c4a6-4007-93bc-7e625c862873', + 0, + :invalidSourceTable, + '2020-09-21 23:04:06.339 +00:00', + '2020-09-21 23:04:06.339 +00:00' + );`, + { + replacements: { invalidSourceTable }, + type: 'RAW', + raw: true + } + ) + } catch (e) { + assert.strictEqual(e.name, 'SequelizeDatabaseError') + assert.strictEqual(e.original.message, `invalid input value for enum "enum_ClockRecords_sourceTable": "${invalidSourceTable}"`) + } + }) +}) diff --git a/creator-node/test/expressApp.js b/creator-node/test/expressApp.test.js similarity index 96% rename from creator-node/test/expressApp.js rename to creator-node/test/expressApp.test.js index f91b9839967..6c46fa4f5b1 100644 --- a/creator-node/test/expressApp.js +++ b/creator-node/test/expressApp.test.js @@ -44,7 +44,7 @@ describe('test expressApp', function () { // logout endpoint requires login / checks session request(app) .post('/users/logout') - .set('X-Session-ID', session + '1') + .set('X-Session-ID', session.sessionToken + '1') .expect(401, done) }) diff --git a/creator-node/test/ffmpeg.js b/creator-node/test/ffmpeg.test.js similarity index 97% rename from creator-node/test/ffmpeg.js rename to creator-node/test/ffmpeg.test.js index 29d96fc3230..16cc2558fed 100644 --- a/creator-node/test/ffmpeg.js +++ b/creator-node/test/ffmpeg.test.js @@ -29,7 +29,6 @@ describe('test segmentFile()', () => { await segmentFile(null, null, {}) assert.fail('Should have thrown error with bad params') } catch (e) { - console.error(e) assert.ok(e.message) } }) @@ -47,7 +46,6 @@ describe('test segmentFile()', () => { await segmentFile(fileDir, fileName, {}) assert.fail('Should have thrown error when segmenting a bad track (image)') } catch (e) { - console.error(e) assert.deepStrictEqual(e.message, 'FFMPEG Error') } }) @@ -64,7 +62,6 @@ describe('test segmentFile()', () => { try { await segmentFile(fileDir, fileName, {}) } catch (e) { - console.error(e) assert.fail(e.message) } diff --git a/creator-node/test/fileManager.js b/creator-node/test/fileManager.test.js similarity index 79% rename from creator-node/test/fileManager.js rename to creator-node/test/fileManager.test.js index f6eef32a923..51c2d2924b3 100644 --- a/creator-node/test/fileManager.js +++ b/creator-node/test/fileManager.test.js @@ -6,7 +6,7 @@ const fsExtra = require('fs-extra') const path = require('path') const { ipfs } = require('../src/ipfsClient') -const { saveFileToIPFSFromFS, removeTrackFolder, saveFileFromBuffer } = require('../src/fileManager') +const { saveFileToIPFSFromFS, removeTrackFolder, saveFileFromBufferToIPFSAndDisk } = require('../src/fileManager') const config = require('../src/config') const models = require('../src/models') @@ -24,7 +24,8 @@ const req = { cnodeUserUUID: uuid() }, logger: { - info: () => {} + info: () => {}, + error: () => {} }, app: { get: key => { @@ -38,9 +39,8 @@ const req = { const segmentsDirPath = 'test/test-segments' const sourceFile = 'segment001.ts' const srcPath = path.join(segmentsDirPath, sourceFile) -const fileType = 'track' -// consts used for testing saveFileFromBuffer() +// consts used for testing saveFileFromBufferToIPFSAndDisk() const metadata = { test: 'field1', track_segments: [{ 'multihash': 'testCIDLink', 'duration': 1000 }], @@ -72,7 +72,7 @@ describe('test fileManager', () => { } try { - await saveFileToIPFSFromFS(reqOverride, srcPath, fileType, sourceFile) + await saveFileToIPFSFromFS(reqOverride, srcPath) assert.fail('Should not have passed if cnodeUserUUID is not present in request.') } catch (e) { assert.deepStrictEqual(e.message, 'User must be authenticated to save a file') @@ -88,7 +88,7 @@ describe('test fileManager', () => { sinon.stub(ipfs, 'addFromFs').rejects(new Error('ipfs is down!')) try { - await saveFileToIPFSFromFS(req, srcPath, fileType, sourceFile) + await saveFileToIPFSFromFS(req, srcPath) assert.fail('Should not have passed if ipfs is down.') } catch (e) { assert.deepStrictEqual(e.message, 'ipfs is down!') @@ -104,29 +104,13 @@ describe('test fileManager', () => { sinon.stub(fs, 'copyFileSync').throws(new Error('Failed to copy files!!')) try { - await saveFileToIPFSFromFS(req, srcPath, fileType, sourceFile) + await saveFileToIPFSFromFS(req, srcPath) assert.fail('Should not have passed if file copying fails.') } catch (e) { assert.deepStrictEqual(e.message, 'Failed to copy files!!') } }) - /** - * Given: a file is being saved to ipfs from fs - * When: the db connection is down - * Then: an error is thrown - */ - it('should throw an error if db connection is down', async () => { - sinon.stub(models.File, 'findOrCreate').rejects(new Error('Failed to find or create file!!!')) - - try { - await saveFileToIPFSFromFS(req, srcPath, fileType, sourceFile) - assert.fail('Should not have passed if db connection is down.') - } catch (e) { - assert.deepStrictEqual(e.message, 'Failed to find or create file!!!') - } - }) - /** * Given: a file is being saved to ipfs from fs * When: everything works as expected @@ -136,10 +120,10 @@ describe('test fileManager', () => { * - that segment should be present in IPFS */ it('should pass saving file to ipfs from fs (happy path)', async () => { - sinon.stub(models.File, 'findOrCreate').returns([{ dataValues: 'data' }]) + sinon.stub(models.File, 'create').returns({ dataValues: { fileUUID: 'uuid' } }) try { - await saveFileToIPFSFromFS(req, srcPath, fileType, sourceFile) + await saveFileToIPFSFromFS(req, srcPath) } catch (e) { assert.fail(e.message) } @@ -167,8 +151,8 @@ describe('test fileManager', () => { }) }) - // ~~~~~~~~~~~~~~~~~~~~~~~~~ saveFileFromBuffer() TESTS ~~~~~~~~~~~~~~~~~~~~~~~~~ - describe('test saveFileFromBuffer()', () => { + // ~~~~~~~~~~~~~~~~~~~~~~~~~ saveFileFromBufferToIPFSAndDisk() TESTS ~~~~~~~~~~~~~~~~~~~~~~~~~ + describe('test saveFileFromBufferToIPFSAndDisk()', () => { /** * Given: a file buffer is being saved to ipfs, fs, and db * When: cnodeUserUUID is not present @@ -186,7 +170,7 @@ describe('test fileManager', () => { } try { - await saveFileFromBuffer(reqOverride, buffer, 'metadata') + await saveFileFromBufferToIPFSAndDisk(reqOverride, buffer) assert.fail('Should not have passed if cnodeUserUUID is not present in request.') } catch (e) { assert.deepStrictEqual(e.message, 'User must be authenticated to save a file') @@ -202,7 +186,7 @@ describe('test fileManager', () => { sinon.stub(ipfs, 'add').rejects(new Error('ipfs is down!')) try { - await saveFileFromBuffer(req, buffer, 'metadata') + await saveFileFromBufferToIPFSAndDisk(req, buffer) assert.fail('Should not have passed if ipfs is down.') } catch (e) { assert.deepStrictEqual(e.message, 'ipfs is down!') @@ -218,40 +202,24 @@ describe('test fileManager', () => { sinon.stub(ipfs, 'add').resolves([{ hash: 'bad/path/fail' }]) // pass bad data to writeFile() try { - await saveFileFromBuffer(req, buffer, 'metadata') + await saveFileFromBufferToIPFSAndDisk(req, buffer) assert.fail('Should not have passed if writing to filesystem fails.') } catch (e) { assert.ok(e.message) } }) - /** - * Given: a file buffer is being saved to ipfs, fs, and db - * When: adding reference to db fails - * Then: an error is thrown - */ - it('should throw an error if writing reference to db fails', async () => { - sinon.stub(models.File, 'findOrCreate').rejects(new Error('Failed to find or create file!!!')) - - try { - await saveFileFromBuffer(req, buffer, 'metadata') - assert.fail('Should not have if db connection is down.') - } catch (e) { - assert.deepStrictEqual(e.message, 'Failed to find or create file!!!') - } - }) - /** * Given: a file buffer is being saved to ipfs, fs, and db * When: everything works as expected * Then: ipfs, fs, and db should have the buffer contents */ it('should pass saving file from buffer (happy path)', async () => { - sinon.stub(models.File, 'findOrCreate').returns([{ dataValues: { fileUUID: 'uuid' } }]) + sinon.stub(models.File, 'create').returns({ dataValues: { fileUUID: 'uuid' } }) let resp try { - resp = await saveFileFromBuffer(req, buffer, 'metadata') + resp = await saveFileFromBufferToIPFSAndDisk(req, buffer) } catch (e) { assert.fail(e.message) } diff --git a/creator-node/test/hashids.js b/creator-node/test/hashids.test.js similarity index 100% rename from creator-node/test/hashids.js rename to creator-node/test/hashids.test.js diff --git a/creator-node/test/lib/dataSeeds.js b/creator-node/test/lib/dataSeeds.js index 40a600ee295..0a7f9c11519 100644 --- a/creator-node/test/lib/dataSeeds.js +++ b/creator-node/test/lib/dataSeeds.js @@ -11,8 +11,12 @@ async function createStarterCNodeUser () { } async function createStarterCNodeUserWithKey (walletPublicKey) { - const cnodeUser = await CNodeUser.create({ walletPublicKey }) - return sessionManager.createSession(cnodeUser.cnodeUserUUID) + const cnodeUser = await CNodeUser.create({ walletPublicKey, clock: 0 }) + const sessionToken = await sessionManager.createSession(cnodeUser.cnodeUserUUID) + return { + cnodeUserUUID: cnodeUser.cnodeUserUUID, + sessionToken: sessionToken + } } module.exports = { createStarterCNodeUser, createStarterCNodeUserWithKey, testEthereumConstants } diff --git a/creator-node/test/nodesync.test.js b/creator-node/test/nodesync.test.js new file mode 100644 index 00000000000..a0a8e8c1177 --- /dev/null +++ b/creator-node/test/nodesync.test.js @@ -0,0 +1,14 @@ +describe('test nodesync', function () { + it('test /export', function () { + /** + * TODO - mock DB state -> confirm export returns deterministict output + */ + }) + + it('test /sync', function () { + /** + * mock export request obj + ensure all files are avail on test IPFS node + * confirm sync successfully + */ + }) +}) diff --git a/creator-node/test/resizeImage.js b/creator-node/test/resizeImage.test.js similarity index 100% rename from creator-node/test/resizeImage.js rename to creator-node/test/resizeImage.test.js diff --git a/creator-node/test/tracks.js b/creator-node/test/tracks.test.js similarity index 79% rename from creator-node/test/tracks.js rename to creator-node/test/tracks.test.js index c594b6fba82..2fd326b161b 100644 --- a/creator-node/test/tracks.js +++ b/creator-node/test/tracks.test.js @@ -21,7 +21,7 @@ const testAudioFilePath = path.resolve(__dirname, 'testTrack.mp3') const testAudioFileWrongFormatPath = path.resolve(__dirname, 'testTrackWrongFormat.jpg') const testAudiusFileNumSegments = 32 -describe('test Tracks', function () { +describe('test Tracks with mocked IPFS', function () { let app, server, session, ipfsMock, libsMock beforeEach(async () => { @@ -48,7 +48,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.jpg' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(400) }) @@ -74,7 +74,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(500) // Reset max file limits @@ -103,7 +103,7 @@ describe('test Tracks', function () { .post('/image_upload') .attach('file', file, { filename: 'fname.jpg' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(500) // Reset max file limits @@ -111,27 +111,27 @@ describe('test Tracks', function () { await server.close() }) - it('uploads file to IPFS', async function () { + it('uploads /track_content', async function () { const file = fs.readFileSync(testAudioFilePath) ipfsMock.addFromFs.exactly(33) ipfsMock.pin.add.exactly(33) - const resp1 = await request(app) + const trackContentResp = await request(app) .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) - assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') - assert.deepStrictEqual(resp1.body.track_segments.length, 32) - assert.deepStrictEqual(resp1.body.source_file.includes('.mp3'), true) - assert.deepStrictEqual(resp1.body.transcodedTrackCID, 'testCIDLink') - assert.deepStrictEqual(typeof resp1.body.transcodedTrackUUID, 'string') + assert.deepStrictEqual(trackContentResp.body.track_segments[0].multihash, 'testCIDLink') + assert.deepStrictEqual(trackContentResp.body.track_segments.length, 32) + assert.deepStrictEqual(trackContentResp.body.source_file.includes('.mp3'), true) + assert.deepStrictEqual(trackContentResp.body.transcodedTrackCID, 'testCIDLink') + assert.deepStrictEqual(typeof trackContentResp.body.transcodedTrackUUID, 'string') }) - // depends on "upload file to IPFS" + // depends on "uploads /track_content" it('creates Audius track', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -139,34 +139,34 @@ describe('test Tracks', function () { ipfsMock.pin.add.exactly(34) libsMock.User.getUsers.exactly(2) - const resp1 = await request(app) + const trackContentResp = await request(app) .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) - assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') - assert.deepStrictEqual(resp1.body.track_segments.length, 32) - assert.deepStrictEqual(resp1.body.source_file.includes('.mp3'), true) + assert.deepStrictEqual(trackContentResp.body.track_segments[0].multihash, 'testCIDLink') + assert.deepStrictEqual(trackContentResp.body.track_segments.length, 32) + assert.deepStrictEqual(trackContentResp.body.source_file.includes('.mp3'), true) // creates Audius track const metadata = { test: 'field1', owner_id: 1, - track_segments: [{ 'multihash': 'testCIDLink', 'duration': 1000 }] + track_segments: trackContentResp.body.track_segments } - const resp2 = await request(app) + const trackMetadataResp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) - .send({ metadata, sourceFile: resp1.body.source_file }) + .set('X-Session-ID', session.sessionToken) + .send({ metadata, sourceFile: trackContentResp.body.source_file }) .expect(200) - assert.deepStrictEqual(resp2.body.metadataMultihash, 'testCIDLink') + assert.deepStrictEqual(trackMetadataResp.body.metadataMultihash, 'testCIDLink') }) - // depends on "upload file to IPFS" + // depends on "uploads /track_content" it('fails to create Audius track when segments not provided', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -178,7 +178,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') @@ -193,12 +193,12 @@ describe('test Tracks', function () { await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata, sourceFile: resp1.body.source_file }) .expect(400) }) - // depends on "upload file to IPFS" + // depends on "uploads /track_content" it('fails to create Audius track when invalid segment multihashes are provided', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -210,7 +210,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') @@ -226,12 +226,12 @@ describe('test Tracks', function () { await request(app) .post('/tracks') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata, sourceFile: resp1.body.source_file }) .expect(400) }) - // depends on "upload file to IPFS" + // depends on "uploads /track_content" it('fails to create Audius track when owner_id is not provided', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -243,7 +243,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') @@ -258,12 +258,12 @@ describe('test Tracks', function () { await request(app) .post('/tracks') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata, sourceFile: resp1.body.source_file }) .expect(400) }) - // depends on "upload file to IPFS" and "creates Audius user" tests + // depends on "uploads /track_content" and "creates Audius track" tests it('completes Audius track creation', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -271,41 +271,41 @@ describe('test Tracks', function () { ipfsMock.pin.add.exactly(34) libsMock.User.getUsers.exactly(4) - const resp1 = await request(app) + const trackContentResp = await request(app) .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) - assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') - assert.deepStrictEqual(resp1.body.track_segments.length, 32) - assert.deepStrictEqual(resp1.body.source_file.includes('.mp3'), true) + assert.deepStrictEqual(trackContentResp.body.track_segments[0].multihash, 'testCIDLink') + assert.deepStrictEqual(trackContentResp.body.track_segments.length, 32) + assert.deepStrictEqual(trackContentResp.body.source_file.includes('.mp3'), true) const metadata = { test: 'field1', - track_segments: [{ 'multihash': 'testCIDLink', 'duration': 1000 }], + track_segments: trackContentResp.body.track_segments, owner_id: 1 } - const resp2 = await request(app) + const trackMetadataResp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) - .send({ metadata, sourceFile: resp1.body.source_file }) + .set('X-Session-ID', session.sessionToken) + .send({ metadata, sourceFile: trackContentResp.body.source_file }) .expect(200) - if (resp2.body.metadataMultihash !== 'testCIDLink') { + if (trackMetadataResp.body.metadataMultihash !== 'testCIDLink') { throw new Error('invalid return data') } await request(app) .post('/tracks') - .set('X-Session-ID', session) - .send({ blockchainTrackId: 1, blockNumber: 10, metadataFileUUID: resp2.body.metadataFileUUID }) + .set('X-Session-ID', session.sessionToken) + .send({ blockchainTrackId: 1, blockNumber: 10, metadataFileUUID: trackMetadataResp.body.metadataFileUUID }) .expect(200) }) - // depends on "upload file to IPFS" + // depends on "uploads /track_content" it('fails to create downloadable track with no track_id and no source_id present', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -317,7 +317,7 @@ describe('test Tracks', function () { .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') @@ -336,12 +336,12 @@ describe('test Tracks', function () { await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(400) }) - // depends on "upload file to IPFS" and "creates Audius user" tests + // depends on "uploads /track_content" and "creates Audius track" tests it('creates a downloadable track', async function () { const file = fs.readFileSync(testAudioFilePath) @@ -349,21 +349,21 @@ describe('test Tracks', function () { ipfsMock.pin.add.exactly(34) libsMock.User.getUsers.exactly(4) - const resp1 = await request(app) + const trackContentResp = await request(app) .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) - assert.deepStrictEqual(resp1.body.track_segments[0].multihash, 'testCIDLink') - assert.deepStrictEqual(resp1.body.track_segments.length, 32) - assert.deepStrictEqual(resp1.body.source_file.includes('.mp3'), true) + assert.deepStrictEqual(trackContentResp.body.track_segments[0].multihash, 'testCIDLink') + assert.deepStrictEqual(trackContentResp.body.track_segments.length, 32) + assert.deepStrictEqual(trackContentResp.body.source_file.includes('.mp3'), true) // needs debugging as to why this 'cid' key is needed for test to work const metadata = { test: 'field1', - track_segments: [{ 'multihash': 'testCIDLink', 'duration': 1000 }], + track_segments: trackContentResp.body.track_segments, owner_id: 1, download: { 'is_downloadable': true, @@ -372,25 +372,25 @@ describe('test Tracks', function () { } } - const resp2 = await request(app) + const trackMetadataResp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) - .send({ metadata, sourceFile: resp1.body.source_file }) + .set('X-Session-ID', session.sessionToken) + .send({ metadata, sourceFile: trackContentResp.body.source_file }) .expect(200) - if (resp2.body.metadataMultihash !== 'testCIDLink') { + if (trackMetadataResp.body.metadataMultihash !== 'testCIDLink') { throw new Error('invalid return data') } await request(app) .post('/tracks') - .set('X-Session-ID', session) - .send({ blockchainTrackId: 1, blockNumber: 10, metadataFileUUID: resp2.body.metadataFileUUID }) + .set('X-Session-ID', session.sessionToken) + .send({ blockchainTrackId: 1, blockNumber: 10, metadataFileUUID: trackMetadataResp.body.metadataFileUUID }) .expect(200) }) }) -describe('test /track_content and /tracks/metadata with actual ipfsClient', function () { +describe('test Tracks with real IPFS', function () { let app, server, session, libsMock, ipfs // Will need a '.' in front of storagePath to look at current dir @@ -430,7 +430,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(500) }) @@ -442,7 +442,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(500) }) @@ -454,7 +454,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func .post('/track_content') .attach('file', file, { filename: 'fname.mp3' }) .set('Content-Type', 'multipart/form-data') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .expect(200) let storagePath = config.get('storagePath') @@ -491,7 +491,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func it('should throw an error if no metadata is passed', async function () { const resp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({}) .expect(400) @@ -508,7 +508,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func const resp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(403) @@ -525,11 +525,11 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func const resp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(500) - assert.deepStrictEqual(resp.body.error, 'Could not save file to disk, ipfs, and/or db: Error: ipfs add failed!') + assert.deepStrictEqual(resp.body.error, '/tracks/metadata saveFileFromBufferToIPFSAndDisk op failed: Error: ipfs add failed!') }) it('successfully adds metadata file to filesystem, db, and ipfs', async function () { @@ -541,7 +541,7 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func const resp = await request(app) .post('/tracks/metadata') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({ metadata }) .expect(function (res) { if (res.body.error) { @@ -581,6 +581,11 @@ describe('test /track_content and /tracks/metadata with actual ipfsClient', func const metadataBuffer = Buffer.from(JSON.stringify(metadata)) assert.deepStrictEqual(metadataBuffer.compare(ipfsResp), 0) }) + + // ~~~~~~~~~~~~~~~~~~~~~~~~~ /tracks TESTS ~~~~~~~~~~~~~~~~~~~~~~~~~ + it('TODO - POST /tracks tests', async function () {}) + + it('TODO - parallel track upload', async function () {}) }) /** diff --git a/creator-node/test/users.js b/creator-node/test/users.test.js similarity index 97% rename from creator-node/test/users.js rename to creator-node/test/users.test.js index 7d0f233613b..b72b0dc48b1 100644 --- a/creator-node/test/users.js +++ b/creator-node/test/users.test.js @@ -11,6 +11,7 @@ const { getLibsMock } = require('./lib/libsMock') describe('test Users', function () { let app, server, ipfsMock, libsMock + /** Setup app + global test vars */ beforeEach(async () => { ipfsMock = getIPFSMock() libsMock = getLibsMock() @@ -216,13 +217,15 @@ describe('test Users', function () { const session = await createStarterCNodeUser() await request(app) .post('/users/logout') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({}) .expect(200) await request(app) .post('/users/logout') - .set('X-Session-ID', session) + .set('X-Session-ID', session.sessionToken) .send({}) .expect(401) }) + + it('TODO - clock_status test', async function () {}) }) diff --git a/identity-service/package-lock.json b/identity-service/package-lock.json index 6f603639b68..bd2ff5a9f72 100644 --- a/identity-service/package-lock.json +++ b/identity-service/package-lock.json @@ -7690,11 +7690,6 @@ } } }, - "node-fetch": { - "version": "2.6.0", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.0.tgz", - "integrity": "sha512-8dG4H5ujfvFiqDmVu9fQ5bOHUC15JMjMY/Zumv26oOvvVJjM67KF8koCWIabKQ1GJIa9r2mMZscBq/TbdOcmNA==" - }, "node-forge": { "version": "0.7.6", "resolved": "https://registry.npmjs.org/node-forge/-/node-forge-0.7.6.tgz", diff --git a/service-commands/scripts/hosts.js b/service-commands/scripts/hosts.js index 70b9b0b2a56..2911f22c420 100644 --- a/service-commands/scripts/hosts.js +++ b/service-commands/scripts/hosts.js @@ -88,6 +88,7 @@ if (cmd === 'add') { throw new Error('Misconfigured local env.\nEnsure AUDIUS_REMOTE_DEV_HOST has been exported and /etc/hosts file has necessary permissions.') } const hostMappings = SERVICES.map(s => `${REMOTE_DEV_HOST} ${s}`) + hostMappings.push(`${REMOTE_DEV_HOST} ${audius_client}`) lines = [...lines, START_SENTINEL, ...hostMappings, END_SENTINEL, '\n'] writeArrayIntoFile(lines) }