Implemented ML Pipeline Continuous new table rows RunInference#37647
Implemented ML Pipeline Continuous new table rows RunInference#37647aIbrahiim wants to merge 7 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @aIbrahiim, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Apache Beam's ML capabilities by introducing a robust example pipeline for performing continuous machine learning inference on structured table data. The new pipeline, built around the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Ignored Files
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #37647 +/- ##
============================================
- Coverage 57.13% 57.01% -0.13%
Complexity 3515 3515
============================================
Files 1228 1225 -3
Lines 189092 188725 -367
Branches 3656 3656
============================================
- Hits 108039 107596 -443
- Misses 77637 77713 +76
Partials 3416 3416
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
3af0279 to
331aa64
Compare
331aa64 to
077e777
Compare
damccorm
left a comment
There was a problem hiding this comment.
Thanks - just had some minor feedback
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # |
There was a problem hiding this comment.
This requirements file should be in the same folder as the test code that uses it
| e.exception)) | ||
| msg = str(e.exception) | ||
| self.assertIn('singleton', msg, msg='Expected singleton view error') | ||
| self.assertIn('more than one', msg, msg='Expected multiple-elements error') |
There was a problem hiding this comment.
Why are we making changes to this test class in this PR?
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new example pipeline for table row inference with scikit-learn models, which is a valuable addition. The implementation covers both batch and streaming modes, and includes comprehensive tests, benchmarks, and documentation.
My review has identified a few areas for improvement:
- Robustness: There are places where the code could be more robust against missing data or arguments.
- Determinism: The use of Python's built-in
hash()for generating fallback keys can lead to non-deterministic behavior in a distributed environment. - Code Structure: The introduction of
table_row_inference_batch.pyalongsidetable_row_inference.py(which also supports batch mode) is a bit confusing. While the former is described as 'simplified', it also has features the latter lacks (like file output). It would be beneficial to either consolidate these into a single, more capable script or clarify their distinct purposes in the documentation. - Exception Handling: Some utility functions catch overly broad exceptions.
I've left specific comments with suggestions to address these points. Overall, this is a great contribution that expands the ML examples in Beam.
| parser.add_argument( | ||
| '--feature_columns', help='Comma-separated list of feature column names') |
There was a problem hiding this comment.
The --feature_columns argument is essential for the pipeline to function correctly, but it is not marked as required. If a user runs the script without providing it, the pipeline will fail with an AttributeError at line 264. Please mark this argument as required.
| parser.add_argument( | |
| '--feature_columns', help='Comma-separated list of feature column names') | |
| parser.add_argument( | |
| '--feature_columns', required=True, help='Comma-separated list of feature column names') |
| features_array = [] | ||
| for row in batch: | ||
| row_dict = row._asdict() | ||
| features = [row_dict[col] for col in self.feature_columns] |
There was a problem hiding this comment.
This line will raise a KeyError if a feature column from self.feature_columns is not present in the row_dict. For better robustness, consider using row_dict.get(col, 0.0) to handle missing features gracefully, which also provides a default value. The table_row_inference_batch.py example already uses this pattern, and it would be good to be consistent.
| features = [row_dict[col] for col in self.feature_columns] | |
| features = [row_dict.get(col, 0.0) for col in self.feature_columns] |
| """ | ||
| data = json.loads(message.decode('utf-8')) | ||
|
|
||
| row_key = data.get('id', str(hash(message))) |
There was a problem hiding this comment.
Using hash() for generating a fallback key is not recommended as its output is not stable across different Python processes or versions. This can lead to non-deterministic behavior, especially in a distributed environment. For a deterministic key, please use a standard hashing algorithm like SHA256 from the hashlib module. You will need to add import hashlib at the top of the file.
| row_key = data.get('id', str(hash(message))) | |
| row_key = data.get('id', hashlib.sha256(message).hexdigest()) |
| """Batch inference pipeline for table rows using RunInference. | ||
|
|
||
| This is a simplified batch-only implementation of ML Pipelines #18. | ||
| It reads table data from files, runs ML inference, and writes results. | ||
|
|
||
| Key Features: | ||
| - BATCH PROCESSING ONLY (no streaming complexity) | ||
| - Reads from files (JSONL, CSV, or custom) | ||
| - Preserves table schema | ||
| - Writes to BigQuery or files | ||
| - Simple and easy to understand |
There was a problem hiding this comment.
This batch-only pipeline seems largely redundant with table_row_inference.py, which already supports a batch mode. This can be confusing for users, especially since this 'simplified' version includes features like file output that the main script lacks. Consider consolidating the two scripts by adding file output support to table_row_inference.py and removing this file to avoid duplication and improve clarity.
| """ | ||
| data = json.loads(line) | ||
|
|
||
| row_id = data.get('id', str(hash(line))) |
There was a problem hiding this comment.
Using hash() for generating a fallback key is not recommended as its output is not stable across different Python processes or versions. This can lead to non-deterministic behavior. For a deterministic key, please use a standard hashing algorithm like SHA256 from the hashlib module. You will need to add import hashlib at the top of the file.
| row_id = data.get('id', str(hash(line))) | |
| row_id = data.get('id', hashlib.sha256(line.encode('utf-8')).hexdigest()) |
| try: | ||
| publisher.get_topic(request={'topic': topic_path}) | ||
| logging.info('Topic %s already exists', topic_name) | ||
| except Exception: |
There was a problem hiding this comment.
Catching a broad Exception can hide bugs or swallow exceptions that should be handled differently. It's better to catch a more specific exception. In this case, publisher.get_topic raises google.api_core.exceptions.NotFound when the topic doesn't exist, which is available as pubsub_v1.exceptions.NotFound. Please catch that specific exception. This feedback also applies to the except Exception: blocks on lines 194, 222, and 229.
| except Exception: | |
| except pubsub_v1.exceptions.NotFound: |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.