Skip to content

feat(DynamicSchemaLoader): add default_type to SchemaTypeIdentifier#1050

Draft
Daryna Ishchenko (darynaishchenko) wants to merge 6 commits into
mainfrom
devin/1781175382-add-default-type-to-schema-type-identifier
Draft

feat(DynamicSchemaLoader): add default_type to SchemaTypeIdentifier#1050
Daryna Ishchenko (darynaishchenko) wants to merge 6 commits into
mainfrom
devin/1781175382-add-default-type-to-schema-type-identifier

Conversation

@darynaishchenko

@darynaishchenko Daryna Ishchenko (darynaishchenko) commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds an optional default_type field to SchemaTypeIdentifier so connectors using DynamicSchemaLoader can gracefully handle unknown source field types instead of crashing.

Problem: When a source returns a field type not present in types_mapping and not a valid Airbyte type, _get_airbyte_type() raises ValueError, crashing the connector. This is especially problematic for sources like ServiceNow that have 100+ field types plus custom/plugin types.

Solution: SchemaTypeIdentifier.default_type — when set, unknown types fall back to this Airbyte type instead of raising:

schema_type_identifier:
  key_pointer: [element]
  type_pointer: [internal_type]
  default_type: string          # ← new: unknown types resolve to string
  types_mapping:
    - target_type: integer
      current_type: integer
    # ...
# dynamic_schema_loader.py — _get_airbyte_type()
if field_type not in AIRBYTE_DATA_TYPES:
    if self.schema_type_identifier.default_type is not None:
        return deepcopy(AIRBYTE_DATA_TYPES[default_type])  # fallback
    raise ValueError(...)  # existing behavior preserved when default_type is None

Backwards compatible — default_type defaults to None, preserving the existing raise-on-unknown behavior.

Requested by Daryna Ishchenko (@darynaishchenko) in the context of airbytehq/airbyte-enterprise#433.


Devin session

Summary by CodeRabbit

  • New Features
    • Added an optional default-type fallback for schema resolution when source field types can’t be mapped.
    • Introduced stream-scoped abort handling in concurrent reads to stop processing remaining partitions after stream-wide failures.
  • Tests
    • Added unit tests validating default-type fallback behavior, including error cases for missing/invalid defaults and mapping precedence.
  • Bug Fixes
    • Improved concurrent read resilience by skipping work for aborted streams.

Add an optional default_type field to SchemaTypeIdentifier that provides
a fallback Airbyte type when a source field type is not found in
AIRBYTE_DATA_TYPES after type mapping. This prevents connectors with
dynamic schemas from crashing on unknown/custom field types.

When default_type is set and a field type cannot be resolved, the loader
uses the default instead of raising ValueError. When default_type is not
set, the existing behavior (raising ValueError) is preserved.

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration

Copy link
Copy Markdown
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment, CI, and merge conflict monitoring

@github-actions

Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1781175382-add-default-type-to-schema-type-identifier#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1781175382-add-default-type-to-schema-type-identifier

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Comment thread unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py Fixed
Comment thread unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py Fixed
devin-ai-integration Bot and others added 2 commits June 11, 2026 11:03
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown

PyTest Results (Fast)

4 123 tests  +14   4 112 ✅ +15   8m 34s ⏱️ + 1m 15s
    1 suites ± 0      11 💤  -  1 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 835bdc5. ± Comparison against base commit fd95ecf.

♻️ This comment has been updated with latest results.

@darynaishchenko Daryna Ishchenko (darynaishchenko) marked this pull request as ready for review June 11, 2026 11:14
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown

PyTest Results (Full)

4 126 tests  +14   4 114 ✅ +15   11m 38s ⏱️ + 2m 13s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌  -  1 

Results for commit 835bdc5. ± Comparison against base commit fd95ecf.

♻️ This comment has been updated with latest results.

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Adds optional default_type to SchemaTypeIdentifier for fallback Airbyte type resolution when source fields are unknown, and introduces StreamAbortRegistry to short-circuit concurrent partition processing when stream-wide failures occur, preventing redundant work on doomed requests.

Changes

Default type fallback configuration

Layer / File(s) Summary
Configuration schema and model wiring
airbyte_cdk/sources/declarative/declarative_component_schema.yaml, airbyte_cdk/sources/declarative/models/declarative_component_schema.py, airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Adds optional default_type string to the YAML schema and Python dataclass; factory wires model.default_type into SchemaTypeIdentifier construction.
Runtime fallback type resolution
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Refactors _get_airbyte_type into an instance method with allow_default_fallback parameter; _get_type detects whether type mapping occurred and only enables fallback when unmapped; fallback validates the default against AIRBYTE_DATA_TYPES or raises specific ValueErrors.
Fallback and error-path tests
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
Adds parametrized tests for successful fallback when default_type is set, ValueError when missing, ValueError when invalid, and confirms invalid types_mapping targets remain errors even with a valid default_type.

Stream abort registry for concurrent processing

Layer / File(s) Summary
StreamAbortRegistry core
airbyte_cdk/sources/concurrent_source/stream_abort_registry.py
Introduces thread-safe registry managing a lock-guarded set of aborted stream names; provides abort(stream_name) and is_aborted(stream_name) methods for multi-threaded access.
ConcurrentReadProcessor failure handling
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Accepts optional StreamAbortRegistry in constructor; on exception, new _is_stream_wide_failure() helper detects stream-scoped failures (config_error type), then aborts remaining partitions for that stream through the registry.
ConcurrentSource registry orchestration
airbyte_cdk/sources/concurrent_source/concurrent_source.py
Instantiates shared StreamAbortRegistry in read() and passes it to ConcurrentReadProcessor, PartitionEnqueuer, and PartitionReader for coordinated abort signaling.
Partition enqueuer and reader abort checks
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py, airbyte_cdk/sources/streams/concurrent/partition_reader.py
Enqueuer accepts registry and breaks partition generation when stream is aborted; reader accepts registry and skips reading aborted partitions, enqueuing failure sentinel instead.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.83% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding a default_type field to SchemaTypeIdentifier within DynamicSchemaLoader.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch devin/1781175382-add-default-type-to-schema-type-identifier

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2558-2561: ⚡ Quick win

Consider adding examples for the default_type field, wdyt?

The new field is well-documented, but adding examples of valid Airbyte type values would help users configure this correctly. Based on standard Airbyte types, you might consider adding something like:

      default_type:
        title: Default Type
        description: The default Airbyte type to use when no type mapping matches the source field type.
        type: string
        examples:
          - "string"
          - "number"
          - "integer"

This would make it clearer what values users should provide, especially since there's no enum constraint at the schema level (though I see from the review context that runtime validation happens in the Python code). Would this be helpful?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around
lines 2558 - 2561, Add an examples array to the YAML schema for the default_type
field so users see valid Airbyte type values; update the default_type entry
(title: "Default Type", description: "The default Airbyte type to use when no
type mapping matches the source field type.") to include examples such as
"string", "number", and "integer" to guide configuration (keep the field as
type: string).
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)

312-430: ⚡ Quick win

Could we add one regression case to protect mapping-validation behavior, wdyt?

Would you add a test where types_mapping resolves to an invalid target type while default_type is set, and assert it still raises ValueError? That would prevent default_type from unintentionally masking mapping typos.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py` around
lines 312 - 430, Add a regression test that ensures a bad mapping target isn't
silently masked by default_type: create a SchemaTypeIdentifier with
types_mapping containing an entry that maps a source type (e.g., "bad_source")
to an invalid target (e.g., "not_a_real_airbyte_type"), set default_type to a
valid type (e.g., "string"), instantiate DynamicSchemaLoader and have its
retriever return a schema record using "bad_source", then assert
loader.get_json_schema() raises ValueError (matching the invalid target type
message) so mapping-validation still fails even when default_type is present.
Reference SchemaTypeIdentifier, DynamicSchemaLoader, and loader.get_json_schema
in the new test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py`:
- Around line 273-284: The _get_airbyte_type function currently always falls
back to schema_type_identifier.default_type when field_type is unrecognized,
which masks typos in mapped target types; change _get_airbyte_type to accept a
boolean parameter allow_default_fallback (default False) and only apply the
default_type fallback when allow_default_fallback is True; keep the existing
validation that raises ValueError if default_type is invalid and always raise
ValueError for unknown field_type when allow_default_fallback is False; update
call sites so only the original unresolved source-type resolution path invokes
_get_airbyte_type(..., allow_default_fallback=True) and mapped
target/type-mapping resolution paths call it with the default False to fail fast
on misconfigured target types (refer to symbols: _get_airbyte_type,
schema_type_identifier.default_type, AIRBYTE_DATA_TYPES).

---

Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 2558-2561: Add an examples array to the YAML schema for the
default_type field so users see valid Airbyte type values; update the
default_type entry (title: "Default Type", description: "The default Airbyte
type to use when no type mapping matches the source field type.") to include
examples such as "string", "number", and "integer" to guide configuration (keep
the field as type: string).

In `@unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py`:
- Around line 312-430: Add a regression test that ensures a bad mapping target
isn't silently masked by default_type: create a SchemaTypeIdentifier with
types_mapping containing an entry that maps a source type (e.g., "bad_source")
to an invalid target (e.g., "not_a_real_airbyte_type"), set default_type to a
valid type (e.g., "string"), instantiate DynamicSchemaLoader and have its
retriever return a schema record using "bad_source", then assert
loader.get_json_schema() raises ValueError (matching the invalid target type
message) so mapping-validation still fails even when default_type is present.
Reference SchemaTypeIdentifier, DynamicSchemaLoader, and loader.get_json_schema
in the new test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 62775b39-e145-4245-a7c8-48f927beaa3d

📥 Commits

Reviewing files that changed from the base of the PR and between fd95ecf and 418b728.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py

Comment thread airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py Outdated
devin-ai-integration Bot and others added 2 commits June 11, 2026 11:22
Add allow_default_fallback parameter to _get_airbyte_type so that
default_type only applies when the source field type was not matched
by any types_mapping entry. This prevents default_type from silently
masking typos in types_mapping target_type values.

Also adds examples to the YAML schema and a regression test for
bad mapping targets not being masked by default_type.

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2558-2565: ⚡ Quick win

Could we validate default_type at schema-validation time instead of runtime only, wdyt?

Right now default_type accepts any string and relies on DynamicSchemaLoader to error later. Would you consider constraining it to known Airbyte primitive types (or referencing a shared definition) so invalid manifests fail earlier?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around
lines 2558 - 2565, The schema currently allows any string for the default_type
field, deferring validation to DynamicSchemaLoader at runtime; change the
declarative_component_schema.yaml definition of default_type to constrain it to
known Airbyte primitive types (e.g., enum:
["string","number","integer","boolean","object","array","null"] or reference a
shared primitive-type definition) so invalid manifests fail schema-validation
early, and update examples/description accordingly to reference the shared
definition if used; keep the symbol name default_type and ensure compatibility
with DynamicSchemaLoader's expected set of types.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 2558-2565: The schema currently allows any string for the
default_type field, deferring validation to DynamicSchemaLoader at runtime;
change the declarative_component_schema.yaml definition of default_type to
constrain it to known Airbyte primitive types (e.g., enum:
["string","number","integer","boolean","object","array","null"] or reference a
shared primitive-type definition) so invalid manifests fail schema-validation
early, and update examples/description accordingly to reference the shared
definition if used; keep the symbol name default_type and ensure compatibility
with DynamicSchemaLoader's expected set of types.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: ff0bb5b2-fd67-4f2f-b5d2-7e96ea00e8f0

📥 Commits

Reviewing files that changed from the base of the PR and between 418b728 and 7f3f744.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

@darynaishchenko

Daryna Ishchenko (darynaishchenko) commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/27344119788

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)

52-72: ⚡ Quick win

Could the abort check happen before pulling the next partition?

Line 55 runs after stream.generate_partitions() has already yielded the next partition, so an aborted stream can still perform one extra generation step before breaking. Would you consider checking the registry before each next() call so the fail-fast path avoids that extra work, wdyt?

♻️ Possible loop shape
-            for partition in stream.generate_partitions():
-                # If the stream already failed fatally on another partition, stop generating new
-                # ones. Remaining partitions would only repeat the same doomed request.
-                if self._stream_abort_registry and self._stream_abort_registry.is_aborted(
-                    stream.name
-                ):
-                    break
+            partitions = iter(stream.generate_partitions())
+            while True:
+                # If the stream already failed fatally on another partition, stop before asking
+                # the generator for more work.
+                if self._stream_abort_registry and self._stream_abort_registry.is_aborted(
+                    stream.name
+                ):
+                    break
+
+                try:
+                    partition = next(partitions)
+                except StopIteration:
+                    break
+
                 # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
                 # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
                 # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py` around lines 52
- 72, The abort check in the loop happens after stream.generate_partitions() has
already yielded the next partition, allowing one unnecessary generation step
even after abort is detected. Restructure the loop to check
self._stream_abort_registry.is_aborted(stream.name) before calling next() on the
generator, so that when a stream is aborted, no additional partition generation
work is performed. This requires reorganizing the control flow to evaluate the
abort condition at the start of each iteration before advancing the generator.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py`:
- Around line 52-72: The abort check in the loop happens after
stream.generate_partitions() has already yielded the next partition, allowing
one unnecessary generation step even after abort is detected. Restructure the
loop to check self._stream_abort_registry.is_aborted(stream.name) before calling
next() on the generator, so that when a stream is aborted, no additional
partition generation work is performed. This requires reorganizing the control
flow to evaluate the abort condition at the start of each iteration before
advancing the generator.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: bd69553d-d08d-4032-b6f2-cc3d627436c4

📥 Commits

Reviewing files that changed from the base of the PR and between 7f3f744 and 835bdc5.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/concurrent_source/stream_abort_registry.py
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
  • airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

@darynaishchenko

Daryna Ishchenko (darynaishchenko) commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/27608552017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant