-
Notifications
You must be signed in to change notification settings - Fork 134
fix(pb): simplify runner wf #3483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 11-18-fix_gas_fix_loop_forgotten_bug_due_to_concurrency
Are you sure you want to change the base?
fix(pb): simplify runner wf #3483
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
More templates
@rivetkit/cloudflare-workers
@rivetkit/db
@rivetkit/framework-base
@rivetkit/next-js
@rivetkit/react
rivetkit
@rivetkit/sql-loader
@rivetkit/engine-runner
@rivetkit/engine-runner-protocol
commit: |
Pull Request ReviewOverviewThis PR simplifies the runner workflow by removing the command/event handling logic from the runner workflow and moving event forwarding directly to actor workflows. This is a significant refactoring that reduces ~500 lines of code by eliminating intermediate state management. Key Changes
Code Quality & Best Practices✅ Good:
Potential Bugs❌ CRITICAL: Compilation Error
Performance Considerations✅ Improvements:
Security Concerns✅ No new security issues introduced
Test Coverage
Additional Questions
SummaryThis is a valuable simplification that removes significant complexity from the runner workflow. However, there's a critical compilation error that needs to be fixed before this can be merged. Additionally, the missing event acknowledgment logic needs to be clarified or implemented. Recommendation: Request changes to fix the compilation error and clarify the event acknowledgment and command flow architecture. Review generated following CLAUDE.md conventions for error handling, logging patterns, and code style. |
fa56065 to
fefb6f0
Compare
1a4f15a to
4294775
Compare
Code ReviewOverviewThis PR significantly simplifies the runner workflow by removing event and command tracking logic. The changes reduce the workflow state and delegate more responsibility to actor workflows. Key Changes Analysis1. Removed Event/Command State Management ✅The PR removes:
Positive: This is a good simplification that reduces state size and complexity. 2. Event Forwarding Moved to WebSocket Handler ✅In protocol::ToServer::ToServerEvents(events) => {
let res = ctx.signal(pegboard::workflows::runner2::Forward {
inner: protocol::ToServer::try_from(msg)
.context("failed to convert message for workflow forwarding")?,
})
.tag("actor_id", actor_id)
.graceful_not_found()
// ...
}Issue Found: This code references Looking at the protocol, Recommendation: You need to iterate over the events and extract actor_id properly: protocol::ToServer::ToServerEvents(events) => {
for event in &events.events {
let actor_id = crate::utils::event_actor_id(&event.inner);
let res = ctx.signal(pegboard::workflows::actor::Event {
inner: event.inner.clone(),
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", &actor_id.to_string())
.graceful_not_found()
.send()
.await
.with_context(|| {
format!("failed to forward signal to actor workflow: {}", actor_id)
})?;
if res.is_none() {
tracing::warn!(?actor_id, "failed to send signal to actor workflow, likely already stopped");
}
}
}3. Removed Init Message Handling 🔍The runner workflow no longer processes
Concerns:
Recommendation: Verify that runner initialization is complete elsewhere, or this may break runner connectivity and actor allocation. 4. Simplified Runner Workflow State ✅
struct LifecycleState {
draining: bool,
}Positive: Much cleaner state management. 5. Variable Naming Improvement ✅In 6. Removed Signal Type ✅Removed Potential Issues
Testing Recommendations
Security ConsiderationsNo security issues identified. The changes maintain proper actor ownership validation in KV operations. Performance ConsiderationsPositive: Removing state buffering reduces memory usage and workflow replay time. The simplified state should improve workflow performance. Code Quality
SummaryThis is a good architectural simplification, but has critical compilation errors that must be fixed before merging. The undefined Recommendation: Fix the compilation error, verify the init flow is complete, and add tests before merging. |
|
Critical Bug Found: The variable |
|
PR Review Summary - Part 1: This PR significantly simplifies the runner workflow by removing ~500 lines of code. Major changes include: MUST FIX:
CODE QUALITY ISSUES:
ARCHITECTURAL CONCERNS:
Questions to address:
|
|
PR Review Summary - Part 2: PERFORMANCE CONSIDERATIONS: Positive:
Concerns:
TEST COVERAGE:
POSITIVE CHANGES:
Overall: This is a valuable simplification but needs the compilation fix and testing before merge. The architectural changes need documentation to help understand the new event flow and migration path for existing runners. |
fefb6f0 to
c088d42
Compare
4294775 to
5458e50
Compare
Code ReviewI've reviewed this PR which simplifies the runner workflow. Here's my detailed feedback: 🎯 Overall AssessmentThis is a substantial refactoring that introduces a new protocol version (v4/mk2) and simplifies the runner workflow architecture. The changes are generally well-structured, but there are several areas that need attention. ✅ Strengths
🔴 Critical Issues1. Compilation Errors in tunnel_to_ws_task.rsThe refactored code has critical issues that will prevent compilation: Lines ~45-65: The
The functions appear to have been extracted but weren't properly parameterized. Example: async fn recv_msg() -> Result<std::result::Result<Option<()>, LifecycleResult>> {
let tunnel_msg = tokio::select! {
res = tunnel_sub.next() => { // ❌ tunnel_sub not in scopeFix: Add proper parameters to these functions or inline them back into the main task loop. 2. Missing Signal Type in conn.rs:149ctx.signal(Init); // ❌ What is Init?This signal type doesn't appear to be defined anywhere in the visible code. This will fail to compile. 3. Undefined Helper Function in lib.rsLine 6 references let workflow_id = if protocol::is_new(protocol_version) { // ❌ Undefined functionShould probably be 4. Incorrect Return Type in tunnel_to_ws_task.rsasync fn recv_msg() -> Result<std::result::Result<Option<()>, LifecycleResult>> {
// ...
Ok(Some(data)) // ❌ Returns data but signature says Option<()>
}The function signature says it returns
|
PR Review: fix(pb): simplify runner wfI've reviewed this PR which introduces a new protocol version (v4/mk2) for the pegboard runner workflow and adds an event demuxer for actor events. Critical Issues1. Compilation Errors in tunnel_to_ws_task.rs
These helper functions need to either accept the necessary parameters in their signatures OR be moved inside the main function as closures. 2. Compilation Errors in ws_to_tunnel_task.rs 3. Missing Signal Handling in conn.rs High Priority Issues4. Potential Memory Leak in ActorEventDemuxer 5. Error Handling Silently Drops Send Failures 6. Protocol Version Checking Inconsistencies Medium Priority Issues7. Missing Protocol Version Field 8. Incorrect Message Type Usage Code QualityGood:
Suggestions:
SummaryCannot merge as-is due to compilation errors. Priority fixes:
Once addressed, this is a solid simplification! |
c088d42 to
657b813
Compare
5458e50 to
8431b53
Compare
8431b53 to
5972ed8
Compare
657b813 to
bc2bb7c
Compare
PR Review: fix(pb): simplify runner wfThis PR introduces significant architectural changes to the Pegboard runner workflow, including a new protocol version ("mk2") and refactoring of the connection handling and event dispatching. Here's my detailed review: 🔴 Critical Issues1. Compilation Errors - Missing Variables in ScopeSeveral functions reference variables that are not in scope, indicating the code is incomplete or won't compile:
2. Incomplete Implementation
3. Missing Signal Struct
🟡 Potential Bugs1. ActorEventDemuxer - Event Loss on First IngestIn pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) {
if let Some(channel) = self.channels.get(&actor_id) {
let _ = channel.tx.send(event); // Existing channel - event sent
} else {
// New channel created... but `event` is never sent!
let (tx, mut rx) = mpsc::unbounded_channel();
// ...
self.channels.insert(actor_id, Channel { tx, handle, last_seen: Instant::now() });
}
// ... gc code
}The first event for each actor will be dropped. 2. GC Never Updates
|
PR Review: fix(pb): simplify runner wfSummaryThis PR introduces significant changes to the Pegboard runner system, introducing a new protocol version (v4/mk2) alongside maintaining backwards compatibility with v1-v3 (mk1). The main goal appears to be simplifying the runner workflow by moving more logic into the pegboard-runner service rather than the workflow itself. Critical Issues1. Compilation Errors - Missing Variables in ClosuresSeveral functions in tunnel_to_ws_task.rs:52-77 - async fn recv_msg() -> Result<std::result::Result<(), LifecycleResult>> {
let tunnel_msg = tokio::select! {
res = tunnel_sub.next() => { // tunnel_sub not in scopetunnel_to_ws_task.rs:80-163 - async fn handle_message_mk2() -> Result<()> {
// Parse message
let msg = match versioned::ToRunner2::deserialize_with_embedded_version(&tunnel_msg.payload) {
// tunnel_msg not in scope, ctx not in scopews_to_tunnel_task.rs:79-117 - Same issue with ws_to_tunnel_task.rs:119-464 - ws_to_tunnel_task.rs:467-759 - 2. Incomplete Function Bodyws_to_tunnel_task.rs:761-768 - async fn ack_commands(ctx: &StandaloneCtx) -> Result<()> {
// ctx.udb()?.run(|| {
// let last_ack = ;
// let stream = .read_ranges_keyvalues({
// limit:
// });
// }).await?;
}This function is called but does nothing and doesn't return properly. 3. Missing Signal Definitionconn.rs:177-178 - if protocol::is_new(protocol_version) {
ctx.signal(Init); // Init is not defined anywhere visible4. Use of
|
5972ed8 to
8a61739
Compare
bc2bb7c to
6f07705
Compare
PR Review: fix(pb): simplify runner wfSummaryThis PR refactors the Pegboard runner workflow to simplify its architecture, adding protocol v4 (mk2) support while maintaining backward compatibility with v3 (mk1). The changes introduce a new Critical Issues1. Compilation Errors in tunnel_to_ws_task.rs and ws_to_tunnel_task.rsLocation: Issue: Multiple function signatures and implementations have critical errors:
Example from tunnel_to_ws_task.rs:52: async fn recv_msg() -> Result<std::result::Result<(), LifecycleResult>> {
let tunnel_msg = tokio::select! {
res = tunnel_sub.next() => { // ERROR: tunnel_sub not in scopeExample from tunnel_to_ws_task.rs:79: async fn handle_message_mk2() -> Result<()> { // ERROR: Missing parameters
let msg = match versioned::ToRunner2::deserialize_with_embedded_version(&tunnel_msg.payload) {
// ERROR: tunnel_msg not in scopeSimilar issues in ws_to_tunnel_task.rs:120: async fn handle_message_mk2(
ctx: &StandaloneCtx,
conn: &Arc<Conn>,
init_tx: &watch::Sender<()>,
event_demuxer: &mut ActorEventDemuxer,
msg: (), // ERROR: Should be Vec<u8> or similar
) -> Result<()> {
let msg = match versioned::ToServerMk2::deserialize(&data, conn.protocol_version) {
// ERROR: data not in scopeImpact: The code will not compile. This appears to be an incomplete refactoring. Recommendation: These helper functions need proper parameter passing or should be refactored back into the main loop with proper closures. 2. Missing Signal Handler in conn.rsLocation: Issue: if protocol::is_new(protocol_version) {
ctx.signal(Init); // ERROR: Init is not defined
} else {The Recommendation: Define the 3. Incorrect Protocol Version Check Function NamesLocation: Multiple files use Issue: The protocol library at pub fn is_mk2(protocol_version: u16) -> bool {
protocol_version > PROTOCOL_MK1_VERSION
}But Recommendation: Use consistent naming - either all Major Issues4. ActorEventDemuxer - Potential Memory LeakLocation: Issue: When a new event arrives for an actor, a new channel is created (line 40), but the
Code: pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) {
if let Some(channel) = self.channels.get(&actor_id) {
let _ = channel.tx.send(event); // last_seen not updated here!
} else {
// Create new channel...
}Recommendation: Update if let Some(channel) = self.channels.get_mut(&actor_id) {
let _ = channel.tx.send(event);
channel.last_seen = Instant::now();
}5. ActorEventDemuxer - Unsafe Task AbortLocation: Issue: There's a TODO comment acknowledging this is unsafe: if !keep {
// TODO: Verify aborting is safe here
channel.handle.abort();
}Aborting tasks can leave workflows in inconsistent states, especially if a signal is mid-flight when the abort happens. Recommendation: Drop the sender and wait for graceful shutdown instead of aborting, similar to what's done in the 6. Missing Init Signal Send in conn.rsLocation: Issue: For mk2 protocol, the code attempts to send an
Current code: if protocol::is_new(protocol_version) {
ctx.signal(Init); // No .to_workflow_id().send().await?
}Recommendation: Complete the signal send chain: if protocol::is_mk2(protocol_version) {
ctx.signal(pegboard::workflows::runner2::Init {})
.to_workflow_id(workflow_id)
.send()
.await?;
}7. Race Condition in tunnel_to_ws_task.rsLocation: Issue: The mk2 protocol path waits for an init signal before processing messages, but there's a potential race:
Recommendation: Consider using a different synchronization primitive (like Minor Issues & Suggestions8. Inconsistent Error Handling in ActorEventDemuxerLocation: Issue: Failed sends are silently ignored: let _ = channel.tx.send(event);While this might be intentional (actor workflow stopped), it would be better to log a warning since events are being dropped. Recommendation: if channel.tx.send(event).is_err() {
tracing::warn!(?actor_id, "failed to send event to actor channel, likely stopped");
}9. Hardcoded ConstantsLocation: Issue: const GC_INTERVAL: Duration = Duration::from_secs(30);
const MAX_LAST_SEEN: Duration = Duration::from_secs(30);These values should potentially be configurable via the config system used elsewhere in the codebase. 10. Missing Protocol Version Field in InsertDbInputLocation: Issue: The Code: ctx.activity(InsertDbInput {
runner_id: input.runner_id,
namespace_id: input.namespace_id,
// ... other fields
// Missing: protocol_version
})Recommendation: Add 11. Logging Style IssuesSeveral logging statements don't follow the codebase style guide (CLAUDE.md):
12. Duplicate Code in tunnel_to_ws_task.rsLocation: Lines 79-163 (mk2) and 166+ (mk1) Issue: The gateway pong handling logic is duplicated between mk1 and mk2 handlers. This violates DRY principles and makes maintenance harder. Recommendation: Extract common gateway pong logic into a separate helper function. Positive Aspects
Test CoverageThe PR doesn't include any tests for:
Recommendation: Add unit tests for ActorEventDemuxer and integration tests for protocol version handling. SummaryThis PR has the right architectural direction but requires significant fixes before it can be merged: Blockers:
Should Fix: Nice to Have: Please address the critical issues before this can be reviewed for merge. Happy to re-review once these are fixed! |
Code ReviewThis PR simplifies the runner workflow by introducing protocol v4 (mk2) and a new event demuxer pattern. Overall the refactoring looks solid, but there are several critical bugs that need to be fixed before merging. Critical Issues1. Compilation Errors in tunnel_to_ws_task.rs and ws_to_tunnel_task.rs Multiple functions are missing required parameters. The recv_msg() and handle_message functions reference variables not in scope (tunnel_sub, eviction_sub, ctx, conn, event_demuxer, init_tx, etc). 2. Missing Signal Definition (conn.rs:178) The code references Init signal but it is not defined. Should use pegboard::workflows::runner2::Init. 3. Missing last_seen Update in ActorEventDemuxer (actor_event_demuxer.rs:36-67) The last_seen timestamp is never updated after initial insertion. All channels will be garbage collected after 30 seconds even if actively receiving events. Use get_mut() and update last_seen on each ingest. High Priority Issues4. Protocol Version Check Inconsistency Uses both is_mk2() and is_new() but is_new() is not defined. Use is_mk2() consistently. 5. Missing Protocol Version Field InsertDbInput has protocol_version field but it is never populated when called (lines 74-82). 6. Unreachable continue Statement In handle_message_mk1() there is a continue inside a function (not a loop). Should be return Ok(()). Medium Priority7. Unsafe Task Abort - Using abort() can leave inconsistent state Low Priority10. Nested Result Types - Hard to read, consider custom enum RecommendationFix critical issues 1-6 first, then address medium-priority issues before merging. |
6f07705 to
9da2f02
Compare
8a61739 to
a5ec483
Compare
Code Review: PR #3483 - fix(pb): simplify runner wfI've reviewed this PR which simplifies the runner workflow and introduces protocol version 4. Here are my findings: Critical Issues 🔴1. Compilation Errors in Both refactored task files have multiple critical compilation errors:
These errors suggest the refactoring was incomplete. The functions need proper parameter passing. 2. ActorEventDemuxer: Potential Memory Leak In if let Some(channel) = self.channels.get(&actor_id) {
let _ = channel.tx.send(event);The
Fix: Update if let Some(channel) = self.channels.get_mut(&actor_id) {
channel.last_seen = Instant::now();
let _ = channel.tx.send(event);3. Silently Dropped Events In Recommendation: Log failed sends at minimum: if channel.tx.send(event).is_err() {
tracing::warn!(?actor_id, "failed to send event to actor channel");
}4. Protocol Version Helper Function Inconsistency In pub fn is_mk2(protocol_version: u16) -> bool {
protocol_version > PROTOCOL_MK1_VERSION
}But in High Priority Issues 🟠5. Missing Signal Type in conn.rs
6. Unsafe Task Abort with TODO
// TODO: Verify aborting is safe here
channel.handle.abort();Aborting tasks can leave actors in inconsistent states. This needs verification before merge, or implement graceful shutdown instead. 7. Incorrect Logging Convention
Medium Priority Issues 🟡8. Missing Error Context in Protocol Detection In 9. Unbounded Channel Risk
10. Inconsistent Import Patterns
use anyhow::Context;Following CLAUDE.md: "Always add imports at the top of the file inside of inline within the function." Though the current import is at the top, ensure new code doesn't add inline imports. 11. Potential Integer Overflow
Positive Aspects ✅
Security Considerations 🔒No major security issues identified. The actor validation in KV requests (checking Testing Recommendations 🧪
SummaryThis PR cannot be merged in its current state due to critical compilation errors. The refactoring needs to be completed to properly pass parameters to the extracted functions. The ActorEventDemuxer also needs the Blocking issues before merge:
Would be happy to review again once these are addressed! |
Pull Request ReviewI've reviewed this PR and found several critical issues that prevent compilation: Critical Compilation Errorstunnel_to_ws_task.rs:
ws_to_tunnel_task.rs:
actor_event_demuxer.rs:
Other Issues
Recommendations
This needs significant work before merging. The errors suggest it wasn't tested after refactoring. |
9da2f02 to
5d9024f
Compare
a5ec483 to
9893845
Compare
9893845 to
7dad582
Compare
5d9024f to
246cd19
Compare
Code Review - PR #3483: Simplify Runner WorkflowSummaryThis PR refactors the pegboard runner workflow system, introducing a new protocol version (v4/mk2). The changes include a new ActorEventDemuxer for batching actor events and significant restructuring of WebSocket message handling. Critical Issues (Must Fix Before Merge)1. Compilation Errors in tunnel_to_ws_task.rsLocation: engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs:22-252
2. Compilation Errors in ws_to_tunnel_task.rsLocation: engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs:79-489
3. Missing Protocol Version FieldLocation: engine/packages/pegboard/src/workflows/runner2.rs:26,295 The Input struct declares Fix: Add 4. last_seen Field Never UpdatedLocation: engine/packages/pegboard-runner/src/actor_event_demuxer.rs:36-67 The Fix: Use High Priority Issues5. Event Batching Without BackpressureLocation: engine/packages/pegboard-runner/src/actor_event_demuxer.rs:40 Unbounded channels could lead to memory exhaustion. Use bounded channels. 6. Silent Channel Send FailureLocation: engine/packages/pegboard-runner/src/actor_event_demuxer.rs:38 Failed sends are silently ignored. Add logging for debugging. 7. Error Message Information DisclosureLocation: ws_to_tunnel_task.rs (multiple locations) Raw error messages are returned to clients via Positive Observations
Test CoverageMissing tests for:
Action ItemsMust Fix (Blocking):
Should Fix: Nice to Have: SummaryThis is a solid architectural improvement, but has critical compilation errors that must be fixed:
Recommendation: Address compilation errors and critical issues before merging. |

No description provided.