diff --git a/package-lock.json b/package-lock.json index 8ad697b..3d74654 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.0.1", "license": "ISC", "dependencies": { + "@wendelmax/tasklets": "^2.2.0", "cli-progress": "^3.12.0", "draftlog": "^1.0.13", "mongodb": "^6.5.0", @@ -98,6 +99,27 @@ "@types/webidl-conversions": "*" } }, + "node_modules/@wendelmax/tasklets": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/@wendelmax/tasklets/-/tasklets-2.2.0.tgz", + "integrity": "sha512-XxVhiBZslq2QtCyJf+h6rXWqm6aVMIxTw/aiDvFPk1FdO49Em0rutR8FrmA9CsQSMosuJaSZqhmQizHfnd5Cdg==", + "cpu": [ + "x64", + "arm64" + ], + "os": [ + "linux", + "darwin", + "win32" + ], + "engines": { + "node": ">=18.0.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/wendelmax" + } + }, "node_modules/abbrev": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz", diff --git a/package.json b/package.json index f07edea..9965ecd 100644 --- a/package.json +++ b/package.json @@ -20,10 +20,11 @@ "@faker-js/faker": "^8.4.1" }, "dependencies": { + "@wendelmax/tasklets": "^2.2.0", "cli-progress": "^3.12.0", "draftlog": "^1.0.13", "mongodb": "^6.5.0", "pg": "^8.11.5", "sqlite3": "^5.1.7" } -} \ No newline at end of file +} diff --git a/src/background-task.js b/src/background-task.js deleted file mode 100644 index 0069dd1..0000000 --- a/src/background-task.js +++ /dev/null @@ -1,17 +0,0 @@ -import { getPostgresConnection } from './db.js' -const db = await getPostgresConnection() - -// console.log(`process ${process.pid} started.`); - -process.on('message', (items) => { - // console.log(` ${process.pid} received ${items.length} items`,); - for (const item of items) { - db.students.insert(item) - .then(() => { - process.send('item-done'); - }) - .catch((error) => { - console.error(error); - }); - } -}); diff --git a/src/cluster.js b/src/cluster.js deleted file mode 100644 index ade430e..0000000 --- a/src/cluster.js +++ /dev/null @@ -1,64 +0,0 @@ -import { fork } from 'child_process' - - -function roundRoubin(array, index = 0) { - return function () { - if (index >= array.length) index = 0 - - return array[index++] - } -} - -// Function to start child processes -function initializeCluster({ backgroundTaskFile, clusterSize, onMessage }) { - const processes = new Map() - for (let index = 0; index < clusterSize; index++) { - - const child = fork(backgroundTaskFile) - child.on('exit', () => { - // console.log(`process ${child.pid} exited`) - processes.delete(child.pid) - }) - - child.on('error', error => { - // console.log(`process ${child.pid} has an error`, error) - process.exit(1) - }) - - child.on('message', (message) => { - if (message !== 'item-done') return - onMessage(message) - }) - - processes.set(child.pid, child) - } - - return { - getProcess: roundRoubin([...processes.values()]), - killAll: () => { - processes.forEach((child) => child.kill()) - } - } - -} - -export function initialize({ backgroundTaskFile, clusterSize, onMessage }) { - - const { getProcess, killAll } = initializeCluster({ backgroundTaskFile, clusterSize, onMessage }) - // console.log(`starting with ${clusterSize} processes`) - - function sendToChild(message) { - const child = getProcess() - // send only if channel is open - // if (child.killed) return; - child.send(message) - } - - - return { - sendToChild: sendToChild.bind(this), - killAll: killAll.bind(this) - } -} - - diff --git a/src/index.js b/src/index.js index e954588..20ca2b1 100644 --- a/src/index.js +++ b/src/index.js @@ -1,18 +1,32 @@ -import { initialize } from "./cluster.js" import { getMongoConnection, getPostgresConnection } from './db.js' import cliProgress from 'cli-progress' -import { setTimeout } from 'node:timers/promises' +import Tasklets from '@wendelmax/tasklets' +import path from 'node:path' + +// Configure Tasklets worker pool +const tasklets = new Tasklets() +tasklets.configure({ + maxWorkers: 8, + minWorkers: 8, + idleTimeout: 30000, + workload: 'io', + adaptive: true, +}) + +// Worker module path (CJS module loaded via require() inside worker threads) +const INSERT_WORKER = `MODULE:${path.join(import.meta.dirname, 'insert-worker.cjs')}` + +// Connect to databases const mongoDB = await getMongoConnection() const postgresDB = await getPostgresConnection() + const ITEMS_PER_PAGE = 4000 -const CLUSTER_SIZE = 99 -const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname -// console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`) +// Clean Postgres before migration await postgresDB.students.deleteAll() +// Async generator to paginate through MongoDB async function* getAllPagedData(itemsPerPage, page = 0) { - const data = mongoDB.students.find().skip(page).limit(itemsPerPage) const items = await data.toArray() if (!items.length) return @@ -23,39 +37,38 @@ async function* getAllPagedData(itemsPerPage, page = 0) { } const total = await mongoDB.students.countDocuments() -// console.log(`total items on DB: ${total}`) +// Progress bar const progress = new cliProgress.SingleBar({ format: 'progress [{bar}] {percentage}% | {value}/{total} | {duration}s', clearOnComplete: false, -}, cliProgress.Presets.shades_classic); - -progress.start(total, 0); -let totalProcessed = 0 -const cp = initialize( - { - backgroundTaskFile: TASK_FILE, - clusterSize: CLUSTER_SIZE, - amountToBeProcessed: total, - async onMessage(message) { - progress.increment() - - if (++totalProcessed !== total) return - // console.log(`all ${amountToBeProcessed} processed! Exiting...`) - progress.stop() - cp.killAll() - - const insertedOnSQLite = await postgresDB.students.count() - console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`) - console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`) - process.exit() - - } - } -) -await setTimeout(1000) - -for await (const data of getAllPagedData(ITEMS_PER_PAGE)) { - cp.sendToChild(data) +}, cliProgress.Presets.shades_classic) + +progress.start(total, 0) + +// Process all pages in parallel using tasklets +const promises = [] + +for await (const items of getAllPagedData(ITEMS_PER_PAGE)) { + // Convert MongoDB docs to plain objects (strip ObjectId etc.) + const plainItems = items.map(({ name, email, age, registeredAt }) => ({ name, email, age, registeredAt })) + + const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { + progress.increment(count) + }) + promises.push(promise) } +await Promise.all(promises) + +progress.stop() + +const insertedOnPostgres = await postgresDB.students.count() +console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnPostgres}`) +console.log(`are the same? ${total === insertedOnPostgres ? 'yes' : 'no'}`) + +// Cleanup +await mongoDB.client.close() +await postgresDB.client.end() +await tasklets.terminate() +process.exit() diff --git a/src/insert-worker.cjs b/src/insert-worker.cjs new file mode 100644 index 0000000..3e08541 --- /dev/null +++ b/src/insert-worker.cjs @@ -0,0 +1,37 @@ +// Worker module for Postgres batch inserts +// Loaded by tasklets worker threads via MODULE: prefix + +const { Client } = require('pg') + +async function insertBatch(items) { + const client = new Client({ + user: 'erickwendel', + host: process.env.POSTGRES_HOST || 'localhost', + database: 'school', + password: 'mypassword', + port: 5432, + }) + + await client.connect() + + try { + // Build a single bulk INSERT for performance + const values = [] + const placeholders = [] + let idx = 1 + + for (const item of items) { + placeholders.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++})`) + values.push(item.name, item.email, item.age, item.registeredAt) + } + + const query = `INSERT INTO students (name, email, age, registered_at) VALUES ${placeholders.join(', ')}` + await client.query(query, values) + } finally { + await client.end() + } + + return items.length +} + +module.exports = insertBatch