Conversation
There was a problem hiding this comment.
Pull request overview
This PR replaces a custom Node.js child process clustering solution with the @wendelmax/tasklets library for parallelizing MongoDB to PostgreSQL data migration. The refactoring aims to improve performance and maintainability by using worker threads with batch inserts instead of child processes with individual inserts.
Changes:
- Replaced custom cluster management (cluster.js, background-task.js) with @wendelmax/tasklets worker pool
- Implemented batch insert strategy using PostgreSQL bulk INSERT statements (4000 items per batch)
- Reduced worker count from 99 child processes to 8 worker threads with configurable pool management
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/insert-worker.cjs | New worker module for batch PostgreSQL inserts, loaded by tasklets threads |
| src/index.js | Refactored main migration logic to use tasklets and Promise-based parallelism |
| src/cluster.js | Removed custom round-robin cluster management implementation |
| src/background-task.js | Removed individual item processing worker |
| package.json | Added @wendelmax/tasklets dependency |
| package-lock.json | Lockfile updates for new dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { | ||
| progress.increment(count) | ||
| }) |
There was a problem hiding this comment.
The progress bar increment uses the count returned from the worker, but if any worker task fails, the progress bar will not reflect the actual number of processed items and could get stuck. The current implementation lacks error handling for failed tasklets.run() calls, which could leave the progress bar incomplete and cause the verification counts to be incorrect.
| const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { | |
| progress.increment(count) | |
| }) | |
| const promise = tasklets.run(INSERT_WORKER, plainItems) | |
| .then(count => { | |
| progress.increment(count) | |
| return count | |
| }) | |
| .catch(err => { | |
| console.error('Worker task failed for a page of items:', err) | |
| // Consider these items as processed (attempted), even if insertion failed, | |
| // so that the progress bar does not get stuck. | |
| progress.increment(plainItems.length) | |
| return 0 | |
| }) |
| promises.push(promise) | ||
| } | ||
|
|
||
| await Promise.all(promises) |
There was a problem hiding this comment.
Using Promise.all() means if any single batch insert fails, the entire migration will fail and stop. This doesn't provide graceful error handling or the ability to retry failed batches. Consider using Promise.allSettled() to process all batches regardless of individual failures, then handle failed batches appropriately.
| await mongoDB.client.close() | ||
| await postgresDB.client.end() | ||
| await tasklets.terminate() |
There was a problem hiding this comment.
The cleanup code doesn't handle potential errors during resource cleanup. If mongoDB.client.close(), postgresDB.client.end(), or tasklets.terminate() fails, the subsequent cleanup calls won't execute and resources may leak. Consider wrapping cleanup calls in try-catch blocks or using Promise.allSettled() to ensure all cleanup attempts are made.
| user: 'erickwendel', | ||
| host: process.env.POSTGRES_HOST || 'localhost', | ||
| database: 'school', | ||
| password: 'mypassword', |
There was a problem hiding this comment.
Hardcoded database credentials (username and password) are a security vulnerability. These sensitive values should be stored in environment variables like the host configuration. Consider using process.env.POSTGRES_USER and process.env.POSTGRES_PASSWORD instead of hardcoding 'erickwendel' and 'mypassword'.
| user: 'erickwendel', | |
| host: process.env.POSTGRES_HOST || 'localhost', | |
| database: 'school', | |
| password: 'mypassword', | |
| user: process.env.POSTGRES_USER, | |
| host: process.env.POSTGRES_HOST || 'localhost', | |
| database: 'school', | |
| password: process.env.POSTGRES_PASSWORD, |
| const client = new Client({ | ||
| user: 'erickwendel', | ||
| host: process.env.POSTGRES_HOST || 'localhost', | ||
| database: 'school', | ||
| password: 'mypassword', | ||
| port: 5432, | ||
| }) | ||
|
|
||
| await client.connect() |
There was a problem hiding this comment.
Creating a new database client for every batch insert is inefficient and can lead to connection pool exhaustion under load. Each worker thread should reuse a single client connection throughout its lifetime. Consider initializing the client once when the worker module loads, or implement connection pooling. With 8 workers processing batches in parallel, this could create hundreds of simultaneous connections.
| for (const item of items) { | ||
| placeholders.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++})`) | ||
| values.push(item.name, item.email, item.age, item.registeredAt) | ||
| } | ||
|
|
||
| const query = `INSERT INTO students (name, email, age, registered_at) VALUES ${placeholders.join(', ')}` | ||
| await client.query(query, values) |
There was a problem hiding this comment.
With ITEMS_PER_PAGE set to 4000, each batch will have 16000 parameters (4 fields per item). PostgreSQL has a maximum of 65535 parameters per query. While the current batch size is safe, if ITEMS_PER_PAGE is increased beyond ~16000 items, the query will fail. Consider adding validation or documentation about this limit.
| minWorkers: 8, | ||
| idleTimeout: 30000, | ||
| workload: 'io', | ||
| adaptive: true, |
There was a problem hiding this comment.
The minWorkers and maxWorkers are both set to 8, which disables the adaptive scaling feature. The 'adaptive: true' configuration has no effect when min and max are the same. Either set different values to enable adaptive scaling, or set 'adaptive: false' to reflect the actual behavior.
| adaptive: true, | |
| adaptive: false, |
| for await (const items of getAllPagedData(ITEMS_PER_PAGE)) { | ||
| // Convert MongoDB docs to plain objects (strip ObjectId etc.) | ||
| const plainItems = items.map(({ name, email, age, registeredAt }) => ({ name, email, age, registeredAt })) | ||
|
|
||
| const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => { | ||
| progress.increment(count) | ||
| }) | ||
| promises.push(promise) | ||
| } |
There was a problem hiding this comment.
All MongoDB pages are fetched and queued for processing immediately in a tight loop. With large datasets, this could load all pages into memory at once before workers can process them, potentially causing memory issues. Consider implementing backpressure by limiting concurrent promises or awaiting batches periodically to control memory usage.
Fala meu brother tudo bem?
Dá uma moral pra nós em testar essa lib pls?
https://www.npmjs.com/package/@wendelmax/tasklets
These changes significantly improve the performance, scalability, and maintainability of the migration process.