Skip to content

[Drain] Reduce Fn Runner changes#37727

Open
stankiewicz wants to merge 1 commit intoapache:masterfrom
stankiewicz:reduce
Open

[Drain] Reduce Fn Runner changes#37727
stankiewicz wants to merge 1 commit intoapache:masterfrom
stankiewicz:reduce

Conversation

@stankiewicz
Copy link
Contributor

Propagation of Drain information from Reduce Runner to WindowedValue


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances Apache Beam's draining capabilities by ensuring that the 'caused by drain' status is consistently propagated across key runner components and data structures. This change allows the system to correctly identify and handle elements and timers that are part of a draining operation, which is crucial for graceful shutdown and resource management in streaming pipelines. The modifications span from core ReduceFn logic to WindowedValue representation and testing utilities, providing a more robust foundation for drain-aware processing.

Highlights

  • Drain Information Propagation: The CausedByDrain enum is now propagated through various core components, including ReduceFn, ReduceFnContextFactory, and ReduceFnRunner, ensuring that the draining status of elements and timers is maintained and accessible throughout the processing pipeline.
  • WindowedValue Enhancements: The WindowedValue and ValueInSingleWindow classes have been updated to include the CausedByDrain status, allowing this critical information to be carried along with the data elements.
  • Runner Context Updates: The ProcessValueContext and OnTriggerContext within ReduceFn now expose the CausedByDrain status, enabling ReduceFn implementations to react to draining signals.
  • Timer Data Integration: Windmill timer data encoding (WindmillTagEncodingV1 and V2) has been modified to correctly capture and propagate the draining status into TimerData objects.
  • Testing Utility Updates: The WindowMatchers utility has been extended to allow matching WindowedValue objects based on their CausedByDrain status, and DoFnTester now explicitly sets CausedByDrain.NORMAL for output values.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
    • Modified outputWithTimestamp and outputWindowedValue to pass the causedByDrain status from the element to the newly created WindowedValues.
  • runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
    • Imported CausedByDrain.
    • Added an abstract causedByDrain() method to ProcessValueContext and OnTriggerContext.
  • runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
    • Updated forValue and forTrigger methods to accept a CausedByDrain parameter.
    • Modified ProcessValueContextImpl and OnTriggerContextImpl constructors to store the CausedByDrain status and added corresponding getter methods.
  • runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
    • Imported CausedByDrain.
    • Modified emit method signature and calls to include CausedByDrain.
    • Updated processElement to pass the causedByDrain status from the input WindowedValue to contextFactory.forValue.
    • Modified WindowActivation constructor to accept CausedByDrain and added a field to store it.
    • Updated onTimers to log the draining status and pass timer.causedByDrain() to WindowActivation and onTrigger calls.
  • runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
    • Added a todo comment to consider setting draining status on timers.
  • runners/core-java/src/main/java/org/apache/beam/runners/core/WindowMatchers.java
    • Updated isWindowedValue and isSingleWindowedValue factory methods to accept an optional drainMatcher.
    • Modified the WindowedValueMatcher constructor and matchesSafely method to include matching against the drainMatcher.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
    • Imported CausedByDrain.
    • Modified windmillTimerToTimerData to set the CausedByDrain status based on the draining flag.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
    • Imported CausedByDrain.
    • Modified windmillTimerToTimerData to set the CausedByDrain status based on the draining flag.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
    • Updated Hamcrest imports to use wildcard.
    • Imported WindowedValues.
    • Added a createDrainingValue helper method to simulate draining input.
    • Added testFixedWindowsWithDraining to verify correct handling of draining elements and timers.
    • Updated assertThat calls in the new test to include CausedByDrain matchers.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
    • Removed direct import of CausedByDrain.
    • Updated causedByDrain() abstract methods in ProcessContext and OnTimerContext to use the fully qualified class name org.apache.beam.sdk.values.CausedByDrain.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
    • Modified output, outputWithTimestamp, and outputWindowedValue methods to explicitly pass CausedByDrain.NORMAL when creating ValueInSingleWindow instances.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
    • Added an abstract getCausedByDrain() method.
    • Modified of factory methods to accept and store the CausedByDrain status, defaulting to NORMAL if not provided.
    • Updated the decode method in ValueInSingleWindowCoder to parse the draining status from ElementMetadata.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant