Search before asking
Motivation
When users configure custom resources (e.g., SINK_WRITER_CPU, SINK_WRITER_MEMORY) for sink operators via FlinkSink#configureSlotSharingGroup, the current implementation creates a dedicated SlotSharingGroup for the operator. This breaks operator chaining — even if the upstream source and the writer are connected by a forward edge, they cannot be chained together because Flink requires chained operators to belong to the same slot sharing group. As a result, data that should be forwarded locally is forced through the network stack, introducing unnecessary serialization/deserialization overhead and extra
thread cost.
Solution
Replace SlotSharingGroup with ResourceSpec for per-operator resource declaration. Instead of operator.slotSharingGroup(slotSharingGroup), use operator.getTransformation().setResources(resourceSpec, resourceSpec). ResourceSpec declares resource requirements at the operator level without altering the slot
sharing group assignment, so the writer operators remain in the default slot sharing group and can still be chained with upstream operators. The Flink scheduler will aggregate the ResourceSpec of all operators within a chain to determine the total slot resource requirement.
Anything else?
No response
Are you willing to submit a PR?
Search before asking
Motivation
When users configure custom resources (e.g., SINK_WRITER_CPU, SINK_WRITER_MEMORY) for sink operators via FlinkSink#configureSlotSharingGroup, the current implementation creates a dedicated SlotSharingGroup for the operator. This breaks operator chaining — even if the upstream source and the writer are connected by a forward edge, they cannot be chained together because Flink requires chained operators to belong to the same slot sharing group. As a result, data that should be forwarded locally is forced through the network stack, introducing unnecessary serialization/deserialization overhead and extra
thread cost.
Solution
Replace SlotSharingGroup with ResourceSpec for per-operator resource declaration. Instead of operator.slotSharingGroup(slotSharingGroup), use operator.getTransformation().setResources(resourceSpec, resourceSpec). ResourceSpec declares resource requirements at the operator level without altering the slot
sharing group assignment, so the writer operators remain in the default slot sharing group and can still be chained with upstream operators. The Flink scheduler will aggregate the ResourceSpec of all operators within a chain to determine the total slot resource requirement.
Anything else?
No response
Are you willing to submit a PR?