-
Notifications
You must be signed in to change notification settings - Fork 86
feat: testing @wendelmax/tasklets #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,18 +1,32 @@ | ||||||||||||||||||||||||||||||||
| import { initialize } from "./cluster.js" | ||||||||||||||||||||||||||||||||
| import { getMongoConnection, getPostgresConnection } from './db.js' | ||||||||||||||||||||||||||||||||
| import cliProgress from 'cli-progress' | ||||||||||||||||||||||||||||||||
| import { setTimeout } from 'node:timers/promises' | ||||||||||||||||||||||||||||||||
| import Tasklets from '@wendelmax/tasklets' | ||||||||||||||||||||||||||||||||
| import path from 'node:path' | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Configure Tasklets worker pool | ||||||||||||||||||||||||||||||||
| const tasklets = new Tasklets() | ||||||||||||||||||||||||||||||||
| tasklets.configure({ | ||||||||||||||||||||||||||||||||
| maxWorkers: 8, | ||||||||||||||||||||||||||||||||
| minWorkers: 8, | ||||||||||||||||||||||||||||||||
| idleTimeout: 30000, | ||||||||||||||||||||||||||||||||
| workload: 'io', | ||||||||||||||||||||||||||||||||
| adaptive: true, | ||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Worker module path (CJS module loaded via require() inside worker threads) | ||||||||||||||||||||||||||||||||
| const INSERT_WORKER = `MODULE:${path.join(import.meta.dirname, 'insert-worker.cjs')}` | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Connect to databases | ||||||||||||||||||||||||||||||||
| const mongoDB = await getMongoConnection() | ||||||||||||||||||||||||||||||||
| const postgresDB = await getPostgresConnection() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const ITEMS_PER_PAGE = 4000 | ||||||||||||||||||||||||||||||||
| const CLUSTER_SIZE = 99 | ||||||||||||||||||||||||||||||||
| const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`) | ||||||||||||||||||||||||||||||||
| // Clean Postgres before migration | ||||||||||||||||||||||||||||||||
| await postgresDB.students.deleteAll() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Async generator to paginate through MongoDB | ||||||||||||||||||||||||||||||||
| async function* getAllPagedData(itemsPerPage, page = 0) { | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const data = mongoDB.students.find().skip(page).limit(itemsPerPage) | ||||||||||||||||||||||||||||||||
| const items = await data.toArray() | ||||||||||||||||||||||||||||||||
| if (!items.length) return | ||||||||||||||||||||||||||||||||
|
|
@@ -23,39 +37,38 @@ async function* getAllPagedData(itemsPerPage, page = 0) { | |||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const total = await mongoDB.students.countDocuments() | ||||||||||||||||||||||||||||||||
| // console.log(`total items on DB: ${total}`) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Progress bar | ||||||||||||||||||||||||||||||||
| const progress = new cliProgress.SingleBar({ | ||||||||||||||||||||||||||||||||
| format: 'progress [{bar}] {percentage}% | {value}/{total} | {duration}s', | ||||||||||||||||||||||||||||||||
| clearOnComplete: false, | ||||||||||||||||||||||||||||||||
| }, cliProgress.Presets.shades_classic); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| progress.start(total, 0); | ||||||||||||||||||||||||||||||||
| let totalProcessed = 0 | ||||||||||||||||||||||||||||||||
| const cp = initialize( | ||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||
| backgroundTaskFile: TASK_FILE, | ||||||||||||||||||||||||||||||||
| clusterSize: CLUSTER_SIZE, | ||||||||||||||||||||||||||||||||
| amountToBeProcessed: total, | ||||||||||||||||||||||||||||||||
| async onMessage(message) { | ||||||||||||||||||||||||||||||||
| progress.increment() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (++totalProcessed !== total) return | ||||||||||||||||||||||||||||||||
| // console.log(`all ${amountToBeProcessed} processed! Exiting...`) | ||||||||||||||||||||||||||||||||
| progress.stop() | ||||||||||||||||||||||||||||||||
| cp.killAll() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const insertedOnSQLite = await postgresDB.students.count() | ||||||||||||||||||||||||||||||||
| console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`) | ||||||||||||||||||||||||||||||||
| console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`) | ||||||||||||||||||||||||||||||||
| process.exit() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| await setTimeout(1000) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for await (const data of getAllPagedData(ITEMS_PER_PAGE)) { | ||||||||||||||||||||||||||||||||
| cp.sendToChild(data) | ||||||||||||||||||||||||||||||||
| }, cliProgress.Presets.shades_classic) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| progress.start(total, 0) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // Process all pages in parallel using tasklets | ||||||||||||||||||||||||||||||||
| const promises = [] | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for await (const items of getAllPagedData(ITEMS_PER_PAGE)) { | ||||||||||||||||||||||||||||||||
| // Convert MongoDB docs to plain objects (strip ObjectId etc.) | ||||||||||||||||||||||||||||||||
| const plainItems = items.map(({ name, email, age, registeredAt }) => ({ name, email, age, registeredAt })) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { | ||||||||||||||||||||||||||||||||
| progress.increment(count) | ||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||
|
Comment on lines
+56
to
+58
|
||||||||||||||||||||||||||||||||
| const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { | |
| progress.increment(count) | |
| }) | |
| const promise = tasklets.run(INSERT_WORKER, plainItems) | |
| .then(count => { | |
| progress.increment(count) | |
| return count | |
| }) | |
| .catch(err => { | |
| console.error('Worker task failed for a page of items:', err) | |
| // Consider these items as processed (attempted), even if insertion failed, | |
| // so that the progress bar does not get stuck. | |
| progress.increment(plainItems.length) | |
| return 0 | |
| }) |
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All MongoDB pages are fetched and queued for processing immediately in a tight loop. With large datasets, this could load all pages into memory at once before workers can process them, potentially causing memory issues. Consider implementing backpressure by limiting concurrent promises or awaiting batches periodically to control memory usage.
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Promise.all() means if any single batch insert fails, the entire migration will fail and stop. This doesn't provide graceful error handling or the ability to retry failed batches. Consider using Promise.allSettled() to process all batches regardless of individual failures, then handle failed batches appropriately.
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup code doesn't handle potential errors during resource cleanup. If mongoDB.client.close(), postgresDB.client.end(), or tasklets.terminate() fails, the subsequent cleanup calls won't execute and resources may leak. Consider wrapping cleanup calls in try-catch blocks or using Promise.allSettled() to ensure all cleanup attempts are made.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,37 @@ | ||||||||||||||||||
| // Worker module for Postgres batch inserts | ||||||||||||||||||
| // Loaded by tasklets worker threads via MODULE: prefix | ||||||||||||||||||
|
|
||||||||||||||||||
| const { Client } = require('pg') | ||||||||||||||||||
|
|
||||||||||||||||||
| async function insertBatch(items) { | ||||||||||||||||||
| const client = new Client({ | ||||||||||||||||||
| user: 'erickwendel', | ||||||||||||||||||
| host: process.env.POSTGRES_HOST || 'localhost', | ||||||||||||||||||
| database: 'school', | ||||||||||||||||||
| password: 'mypassword', | ||||||||||||||||||
|
Comment on lines
+8
to
+11
|
||||||||||||||||||
| user: 'erickwendel', | |
| host: process.env.POSTGRES_HOST || 'localhost', | |
| database: 'school', | |
| password: 'mypassword', | |
| user: process.env.POSTGRES_USER, | |
| host: process.env.POSTGRES_HOST || 'localhost', | |
| database: 'school', | |
| password: process.env.POSTGRES_PASSWORD, |
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new database client for every batch insert is inefficient and can lead to connection pool exhaustion under load. Each worker thread should reuse a single client connection throughout its lifetime. Consider initializing the client once when the worker module loads, or implement connection pooling. With 8 workers processing batches in parallel, this could create hundreds of simultaneous connections.
Copilot
AI
Feb 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With ITEMS_PER_PAGE set to 4000, each batch will have 16000 parameters (4 fields per item). PostgreSQL has a maximum of 65535 parameters per query. While the current batch size is safe, if ITEMS_PER_PAGE is increased beyond ~16000 items, the query will fail. Consider adding validation or documentation about this limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The minWorkers and maxWorkers are both set to 8, which disables the adaptive scaling feature. The 'adaptive: true' configuration has no effect when min and max are the same. Either set different values to enable adaptive scaling, or set 'adaptive: false' to reflect the actual behavior.