feat: Add Prefect Cloud pipeline connector#27607
feat: Add Prefect Cloud pipeline connector#27607itsroshanharry wants to merge 6 commits intoopen-metadata:mainfrom
Conversation
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
@itsroshanharry can you make sure to use claude skills to run connector-audit and connector-review. Finally add integration tests with prefect docker and see if we are getting lineage not just the pipeline metadata and also statuses of the pipelines |
|
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
🟡 Playwright Results — all passed (17 flaky)✅ 3694 passed · ❌ 0 failed · 🟡 17 flaky · ⏭️ 89 skipped
🟡 17 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
|
|
|
- Add Prefect connection schema and service type registration - Implement Python connector with full topology support - Add tag-based lineage detection with om- prefix support - Include comprehensive unit tests (6 tests passing) - Support Prefect 3.x API compatibility - Case-insensitive tag parsing with duplicate removal
- Re-enabled test_connection() in metadata.py (was temporarily disabled) - Enhanced lineage detection with prefixed format (om-source:, om-destination:) - Added case-insensitive tag parsing and duplicate removal - Maintained backward compatibility with legacy format (source:, destination:) - All unit tests passing (6/6)
- Fix test_connection authorization header issue by using self.connection from parent class - Add hostPort config support for self-hosted Prefect Server - Update User-Agent to identify as OpenMetadata/Prefect-Connector - Add deployment caching to reduce API calls by 50% - Add warning log for workspaces with 200+ flows - Fix parse_timestamp type hint to Optional[int] - Update all unit tests with proper mocking Addresses all 6 issues identified in code review.
- Apply Black, isort, and pycln formatting - Add support for self-hosted Prefect Server mode - Detect mode based on presence of accountId/workspaceId - Make accountId and workspaceId optional in schema (only required for Cloud) - Cloud mode: uses account/workspace URL pattern - Server mode: uses simple /api endpoint - Enables Docker-based integration testing
- Add pagination for flows (fixes data loss with >200 flows) - Add SSL configuration support (verifySSL + sslConfig) - Remove unbounded cache to prevent memory issues - Add Docker integration tests (4/4 passing) - Fix Pydantic V2 compatibility (parse_obj -> model_validate) - Add validation for accountId/workspaceId consistency - Extract shared base URL builder to eliminate duplication - Dynamic sourceUrl for Cloud vs self-hosted modes - Support both Prefect Cloud and self-hosted Prefect Server Addresses maintainer feedback: - Python formatting applied (make py_format) - Connector review completed (9.2/10 score) - Docker integration tests added and passing - Self-hosted + Cloud dual-mode support verified - All Gitar bot feedback addressed SSL implementation follows SSRS connector pattern. Integration tests verify connectivity with Docker Prefect server.
e85079b to
5e7ac14
Compare
|
- Fix SSL no-ssl mode to correctly map None to False - Remove hardcoded JWT tokens from integration tests - Move _build_base_url import to top of metadata.py
Code Review ✅ Approved 12 resolved / 12 findingsAdds the Prefect Cloud pipeline connector with comprehensive fixes for authentication, pagination, and URL construction. All identified security, reliability, and configuration issues have been successfully addressed. ✅ 12 resolved✅ Bug: test_connection sends requests without Authorization header
✅ Bug: Hardcoded API URL ignores hostPort schema field
✅ Bug: Spoofed User-Agent header is unnecessary and misleading
✅ Performance: Deployments fetched twice per flow (yield_pipeline + lineage)
✅ Edge Case: No pagination: silently drops flows beyond first 200
...and 7 more resolved from earlier reviews OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
|
@harshach All done! Ran connector-audit and connector-review - fixed the issues they found (pagination, memory leak, SSL config). test_prefect_connectivity.py - Tests basic connector functionality against a real Prefect server in Docker. All 4 tests passing (health check, flows, flow runs, deployments). test_prefect_lineage.py - Full E2E tests that create flows with om-source/om-destination tags, run the connector, and verify pipeline metadata, statuses, and lineage edges are created. The lineage test checks that edges exist between source table → pipeline → destination table with the pipeline referenced in lineage details. The connectivity tests are fully verified. The lineage tests have the implementation and assertions in place - they'll pass once the connector is registered in the backend after merge. Also fixed the latest Gitar bot issues (SSL handling, hardcoded tokens, import location). Let me know if anything needs adjusting! |



Describe your changes:
Fixes #26656
This PR adds a new pipeline connector for Prefect Cloud, enabling OpenMetadata to ingest pipeline metadata, run history, and lineage from Prefect 3.x workspaces.
What changes did I make?
Why did I make them?
How did I test my changes?
Type of change:
Checklist:
feat: Add Prefect Cloud pipeline connectorNew feature checklist:
Summary
This PR adds a new pipeline connector for Prefect Cloud, enabling OpenMetadata to ingest pipeline metadata, run history, and lineage from Prefect 3.x workspaces.
Features
om-source:,om-destination:)Files Changed
Backend/Schema Files:
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/prefectConnection.json- Connection configuration schemaopenmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json- Added Prefect to service type enumPython Connector:
ingestion/src/metadata/ingestion/source/pipeline/prefect/__init__.py- Package initingestion/src/metadata/ingestion/source/pipeline/prefect/metadata.py- Main connector logicingestion/src/metadata/ingestion/source/pipeline/prefect/connection.py- Connection testingingestion/src/metadata/ingestion/source/pipeline/prefect/service_spec.py- Service specificationUI Files:
openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts- Added Prefect to UI utilitiesTests:
ingestion/tests/unit/topology/pipeline/test_prefect.py- Comprehensive unit tests (6 tests, all passing)Configuration Example
Lineage Detection
The connector supports tag-based lineage detection with two formats:
Recommended Format (Prefixed)
Tag your Prefect flows or deployments with the
om-prefix to avoid conflicts with other tagging conventions:om-source:service.database.schema.table- for source tables (full FQN)om-destination:service.database.schema.table- for destination tables (full FQN)Example:
Legacy Format (Backward Compatible)
The connector also supports the legacy format without prefix:
source:service.database.schema.tabledestination:service.database.schema.tableExample:
Features:
Best Practices:
om-source:,om-destination:) to avoid conflictsservice.database.schema.tableordatabase.schema.tableImportant Notes:
mysql.warehouse.sales.ordersorwarehouse.sales.orders). Simple table names likeorderswill not be found.Testing
All unit tests pass:
Documentation Requirements
The following documentation needs to be added to the
openmetadata-docsrepository:1. Connector Documentation
Create folder:
openmetadata-docs/content/v1.12-SNAPSHOT/connectors/pipeline/prefect/Files needed:
index.mdx- UI configuration guideyaml.mdx- YAML configuration guide2. Update Connector Lists
Add Prefect to:
openmetadata-docs/content/v1.12-SNAPSHOT/menu.mdxopenmetadata-docs/content/v1.12-SNAPSHOT/connectors/index.mdxopenmetadata-docs/content/v1.12-SNAPSHOT/connectors/pipeline/index.mdxopenmetadata-docs/partials/v1.12.x/connectors-list.mdx3. Add Connector Logo
Upload Prefect logo to:
openmetadata-docs/public/images/connectors/prefect.png4. Add Installation Images
Create directory:
openmetadata-docs/public/images/v1.12/connectors/prefect/Add screenshots for installation steps
Implementation Details
API Compatibility
POST /flows/filterinstead ofGET /flows)Data Mapping
Status Mapping
Breaking Changes
None - this is a new connector.
Checklist
🚀 Future Enhancements
This PR delivers a stable, production-ready V1 connector with comprehensive functionality. Potential enhancements for future versions include:
Artifact-Based Lineage
While the current version uses a robust tag-based approach (optimal for most Prefect users and consistent with the Prefect 3.x API), future iterations could explore automatic lineage tracking via Prefect's artifacts API for even deeper integration. This would allow lineage to be captured programmatically within flow code rather than through tags.
Self-Hosted Prefect Server Support
The initial focus is on Prefect Cloud (the most common deployment). Testing and validation for local/self-hosted Prefect Server instances will follow in a future update. The connector architecture is designed to support both with minimal changes (primarily URL configuration).
Enhanced Metadata
Future versions could ingest additional Prefect metadata such as:
Pagination for Large Workspaces
The current implementation fetches up to 200 flows per request (Prefect API limit). For workspaces with >200 flows, pagination logic could be added to fetch all flows across multiple requests.
These enhancements are intentionally scoped out of V1 to ensure a stable, well-tested foundation that can be extended incrementally based on community feedback.
Related Issues
Closes #26656 - New Pipeline Connectors (Prefect implementation)
This PR implements the Prefect connector as requested in the "New Pipeline Connectors" issue. Prefect was listed as one of the modern workflow orchestration tools to be added to OpenMetadata's connector ecosystem.
Additional Notes
Summary by Gitar
connection.pyto correctly handleNoneandFalsevalues.conftest.pyandtest_prefect_lineage.pyto useOM_JWTandOM_HOST_PORTenvironment variables instead of hardcoded defaults.pytest.skipcheck forOM_JWTto prevent test suite failures in unconfigured environments.This will update automatically on new commits.