diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index b85e3e39bf1..38fb18211bf 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -133,47 +133,31 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: st state materialization on the same port. `storage_uri_base` is the port's base URI; the result and state URIs are derived from it. """ - document, _ = DocumentFactory.open_document( - VFSURIFactory.result_uri(storage_uri_base) - ) - buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) - writer_queue = Queue() - port_storage_writer = PortStorageWriter( - buffered_item_writer=buffered_item_writer, queue=writer_queue - ) - writer_thread = threading.Thread( - target=port_storage_writer.run, - daemon=True, - name=f"port_storage_writer_thread_{port_id}", - ) - writer_thread.start() - self._port_storage_writers[port_id] = ( - writer_queue, - port_storage_writer, - writer_thread, - ) - state_document, _ = DocumentFactory.open_document( - VFSURIFactory.state_uri(storage_uri_base) - ) - state_buffered_item_writer = state_document.writer( - str(get_worker_index(self.worker_id)) - ) - state_writer_queue = Queue() - state_port_writer = PortStorageWriter( - buffered_item_writer=state_buffered_item_writer, - queue=state_writer_queue, - ) - state_writer_thread = threading.Thread( - target=state_port_writer.run, - daemon=True, - name=f"port_state_writer_thread_{port_id}", + def start_writer(uri: str, name_prefix: str, registry: dict) -> None: + document, _ = DocumentFactory.open_document(uri) + writer_queue = Queue() + writer = PortStorageWriter( + buffered_item_writer=document.writer( + str(get_worker_index(self.worker_id)) + ), + queue=writer_queue, + ) + thread = threading.Thread( + target=writer.run, daemon=True, name=f"{name_prefix}_{port_id}" + ) + thread.start() + registry[port_id] = (writer_queue, writer, thread) + + start_writer( + VFSURIFactory.result_uri(storage_uri_base), + "port_storage_writer_thread", + self._port_storage_writers, ) - state_writer_thread.start() - self._port_state_writers[port_id] = ( - state_writer_queue, - state_port_writer, - state_writer_thread, + start_writer( + VFSURIFactory.state_uri(storage_uri_base), + "port_state_writer_thread", + self._port_state_writers, ) def get_port(self, port_id=None) -> WorkerPort: @@ -203,14 +187,23 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None: PortStorageWriterElement(data_tuple=tuple_) ) - def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + def save_state_to_storage_if_needed( + self, + state: State, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", + port_id=None, + ) -> None: # When port_id is omitted the same state row is fanned out to # every output port's state table. This mirrors the # broadcast-to-all-workers behavior on the emit side: state is # shared context, not per-key data, so every downstream operator # (and every worker reading the materialization) needs the full # set. - element = PortStorageWriterElement(data_tuple=state.to_tuple()) + element = PortStorageWriterElement( + data_tuple=state.to_tuple(loop_counter, loop_start_id, loop_start_state_uri) + ) if port_id is None: for writer_queue, _, _ in self._port_state_writers.values(): writer_queue.put(element) @@ -223,18 +216,16 @@ def close_port_storage_writers(self) -> None: writer threads to finish, which indicates the port storage writing are finished. """ - for _, writer, _ in self._port_storage_writers.values(): - # This non-blocking stop call will let the storage writers - # flush the remaining buffer - writer.stop() - for _, _, writer_thread in self._port_storage_writers.values(): - # This blocking call will wait for all the writer to finish commit - writer_thread.join() - for _, state_writer, _ in self._port_state_writers.values(): - state_writer.stop() - for _, _, state_writer_thread in self._port_state_writers.values(): - state_writer_thread.join() - self._port_state_writers.clear() + for registry in (self._port_storage_writers, self._port_state_writers): + # Non-blocking stop lets each writer flush its remaining buffer; + # the join then waits for the commit to finish. + for _, writer, _ in registry.values(): + writer.stop() + for _, _, thread in registry.values(): + thread.join() + # Drop the stopped writers so a later close doesn't act on + # stale entries. + registry.clear() def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None: """ @@ -290,7 +281,11 @@ def emit_ecm( ) def emit_state( - self, state: State + self, + state: State, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]: return chain( *( @@ -298,7 +293,12 @@ def emit_state( ( receiver, ( - StateFrame(payload) + StateFrame( + payload, + loop_counter=loop_counter, + loop_start_id=loop_start_id, + loop_start_state_uri=loop_start_state_uri, + ) if isinstance(payload, State) else self.tuple_to_frame(payload) ), diff --git a/amber/src/main/python/core/models/payload.py b/amber/src/main/python/core/models/payload.py index 61a33294882..3f1ec4e7fe3 100644 --- a/amber/src/main/python/core/models/payload.py +++ b/amber/src/main/python/core/models/payload.py @@ -34,3 +34,12 @@ class DataFrame(DataPayload): @dataclass class StateFrame(DataPayload): frame: State + # Loop-control bookkeeping owned by the worker runtime, carried alongside + # the State payload (not inside it) so it never collides with user state. + # Defaults are the "no loop" values for all non-loop state. + loop_counter: int = 0 + # Which LoopStart to jump back to, and the iceberg URI its input is read + # from. Set by the runtime on a LoopStart's output, consumed by the + # matching LoopEnd. Empty for non-loop / not-yet-stamped state. + loop_start_id: str = "" + loop_start_state_uri: str = "" diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 003aaa212ac..4fce475e499 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -25,13 +25,41 @@ class State(dict): CONTENT = "content" - SCHEMA = Schema(raw_schema={CONTENT: "STRING"}) + # Loop-control bookkeeping owned by the worker runtime, NOT user state -- it + # never appears in the content JSON. In memory it rides on the StateFrame + # envelope; it is materialized/serialized as its own column (parallel to + # content) by to_tuple(...). from_tuple() returns the bare State; callers + # that need these values read the corresponding columns off the tuple. + LOOP_COUNTER = "loop_counter" + LOOP_START_ID = "loop_start_id" + LOOP_START_STATE_URI = "loop_start_state_uri" + SCHEMA = Schema( + raw_schema={ + CONTENT: "STRING", + LOOP_COUNTER: "LONG", + LOOP_START_ID: "STRING", + LOOP_START_STATE_URI: "STRING", + } + ) def to_json(self) -> str: return json.dumps(_to_json_value(self), separators=(",", ":")) - def to_tuple(self) -> Tuple: - return Tuple({State.CONTENT: self.to_json()}, schema=State.SCHEMA) + def to_tuple( + self, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", + ) -> Tuple: + return Tuple( + { + State.CONTENT: self.to_json(), + State.LOOP_COUNTER: int(loop_counter), + State.LOOP_START_ID: loop_start_id, + State.LOOP_START_STATE_URI: loop_start_state_uri, + }, + schema=State.SCHEMA, + ) @classmethod def from_json(cls, payload: str) -> "State": diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index 8ba4fbe1472..2c672a07750 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int: "Data", lambda _: DataFrame(table), "State", - lambda _: StateFrame(State.from_json(table[State.CONTENT][0].as_py())), + lambda _: StateFrame( + State.from_json(table[State.CONTENT][0].as_py()), + loop_counter=int(table[State.LOOP_COUNTER][0].as_py()), + loop_start_id=table[State.LOOP_START_ID][0].as_py(), + loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(), + ), "ECM", lambda _: EmbeddedControlMessage().parse(table["payload"][0].as_py()), ) diff --git a/amber/src/main/python/core/runnables/network_sender.py b/amber/src/main/python/core/runnables/network_sender.py index d8e3889ac11..68d89e0ebf1 100644 --- a/amber/src/main/python/core/runnables/network_sender.py +++ b/amber/src/main/python/core/runnables/network_sender.py @@ -20,7 +20,13 @@ from overrides import overrides from typing import Optional -from core.models import DataPayload, InternalQueue, DataFrame, State, StateFrame +from core.models import ( + DataPayload, + InternalQueue, + DataFrame, + State, + StateFrame, +) from core.models.internal_queue import ( InternalQueueElement, DataElement, @@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload: DataPayload) -> None: elif isinstance(data_payload, StateFrame): data_header = PythonDataHeader(tag=to, payload_type="State") table = pa.Table.from_pydict( - {State.CONTENT: [data_payload.frame.to_json()]}, + { + State.CONTENT: [data_payload.frame.to_json()], + State.LOOP_COUNTER: [int(data_payload.loop_counter)], + State.LOOP_START_ID: [data_payload.loop_start_id], + State.LOOP_START_STATE_URI: [data_payload.loop_start_state_uri], + }, schema=State.SCHEMA.as_arrow_schema(), ) self._proxy_client.send_data(bytes(data_header), table) diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index 3e0e2d48ab5..97a24fe073d 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -34,7 +34,14 @@ from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame +from core.models import ( + Tuple, + InternalQueue, + DataFrame, + DataPayload, + State, + StateFrame, +) from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.storage.vfs_uri_factory import VFSURIFactory @@ -152,7 +159,14 @@ def run(self) -> None: VFSURIFactory.state_uri(self.uri) ) for state_row in state_document.get(): - self.emit_payload(StateFrame(State.from_tuple(state_row))) + self.emit_payload( + StateFrame( + State.from_tuple(state_row), + loop_counter=state_row[State.LOOP_COUNTER], + loop_start_id=state_row[State.LOOP_START_ID], + loop_start_state_uri=state_row[State.LOOP_START_STATE_URI], + ) + ) storage_iterator = self.materialization.get() # Iterate and process tuples. diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 030fa3a3bbd..80cc24780b0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -242,7 +242,7 @@ class OutputManager( // emit side: state is shared context, not per-key data, so every // downstream operator (and every worker reading the materialization) // needs the full set. - stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple))) + stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple()))) } /** diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala index 6618e857b1d..144c3ac57a3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -125,7 +125,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu case DataFrame(frame) => writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), from, "Data") case StateFrame(state) => - writeArrowStream(mutable.Queue(state.toTuple), from, "State") + writeArrowStream(mutable.Queue(state.toTuple()), from, "State") } } diff --git a/amber/src/test/python/core/architecture/packaging/test_output_manager.py b/amber/src/test/python/core/architecture/packaging/test_output_manager.py index dcf7ccde673..d132bd9b623 100644 --- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py +++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py @@ -49,15 +49,15 @@ def port_b(self): @pytest.fixture def state(self): - return State({"loop_counter": 1, "i": 2}) + return State({"i": 2}) def test_no_state_writers_is_a_noop(self, output_manager, state): # With no port set up, save_state_to_storage_if_needed must not # touch any writer. - output_manager.save_state_to_storage_if_needed(state) # no-op + output_manager.save_state_to_storage_if_needed(state, 0) # no-op def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a): - output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + output_manager.save_state_to_storage_if_needed(state, 0, port_id=port_a) # No assertion needed -- the absence of any writer means nothing # was attempted. @@ -67,7 +67,7 @@ def test_enqueues_to_every_port_when_port_id_omitted( queue_a, _, _ = _stub_state_writer(output_manager, port_a) queue_b, _, _ = _stub_state_writer(output_manager, port_b) - output_manager.save_state_to_storage_if_needed(state) + output_manager.save_state_to_storage_if_needed(state, 0) # Each port's writer queue receives one PortStorageWriterElement. # Critically, save is non-blocking -- the call must not invoke @@ -84,7 +84,7 @@ def test_enqueues_only_to_selected_port_when_port_id_specified( queue_a, _, _ = _stub_state_writer(output_manager, port_a) queue_b, _, _ = _stub_state_writer(output_manager, port_b) - output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + output_manager.save_state_to_storage_if_needed(state, 0, port_id=port_a) assert queue_a.put.call_count == 1 queue_b.put.assert_not_called() @@ -105,3 +105,16 @@ def test_close_port_storage_writers_stops_state_threads( thread_a.join.assert_called_once() thread_b.join.assert_called_once() assert output_manager._port_state_writers == {} + + def test_defaults_loop_columns_when_omitted(self, output_manager, state, port_a): + # Dormancy: callers that pass no loop bookkeeping (every non-loop + # caller, e.g. MainLoop.process_input_state) still produce a valid + # 4-column state tuple with the loop columns at their no-loop defaults. + queue_a, _, _ = _stub_state_writer(output_manager, port_a) + + output_manager.save_state_to_storage_if_needed(state) # no loop_counter + + data_tuple = queue_a.put.call_args.args[0].data_tuple + assert data_tuple[State.LOOP_COUNTER] == 0 + assert data_tuple[State.LOOP_START_ID] == "" + assert data_tuple[State.LOOP_START_STATE_URI] == "" diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index 8613be95b18..43e2959cd95 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -25,7 +25,7 @@ OutputManager.set_up_port_storage_writer(port, base_uri) → real PortStorageWriter thread - → real IcebergTableWriter (postgres-backed JdbcCatalog) + → real IcebergTableWriter (sqlite-backed SqlCatalog) → state document at VFSURIFactory.state_uri(base_uri) → InputPortMaterializationReaderRunnable.run() → DataElement(StateFrame) on the consumer's input queue @@ -33,20 +33,14 @@ and asserts that a state put through `save_state_to_storage_if_needed` on the producer side actually arrives at the consumer's queue, with the same payload. - -Marked @integration so the CI runner that has postgres + iceberg -catalog DB provisioned (amber-integration) picks it up via -`pytest -m integration`. Earlier versions of this test substituted a -sqlite-backed SqlCatalog to dodge that infra dependency; that diverged -from the prod catalog code path, so we now exercise the real one. """ -import os import tempfile import threading import uuid import pytest +from pyiceberg.catalog.sql import SqlCatalog from core.architecture.packaging.output_manager import OutputManager from core.models import State, StateFrame @@ -74,203 +68,198 @@ ) -@pytest.mark.integration -class TestStateMaterializationE2E: - @pytest.fixture(autouse=True, scope="class") - def _init_storage_config(self): - """Initialize StorageConfig + IcebergCatalogInstance for the real - postgres-backed catalog in the `amber-integration` CI job. - - Critical detail: the Scala integration tests that run earlier in - the same job connect to the iceberg catalog DB as user - `postgres/postgres` (the storage.conf default for - `STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME/PASSWORD`). pyiceberg - creates the catalog's `iceberg_tables` metadata table on first - use, owned by whoever wrote first — so it ends up owned by - `postgres`. We MUST connect as the same user, otherwise we hit - `permission denied for table iceberg_tables`. +# Module-level scratch dir for the sqlite catalog + iceberg warehouse. +# We don't initialize `StorageConfig` here: other test modules (e.g. +# test_iceberg_document.py) also call `StorageConfig.initialize` at +# import time, and the class rejects re-initialization with +# RuntimeError. Whichever module gets collected first wins; we adopt +# its namespaces below. +_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-") - Why the reset: `test_iceberg_document.py` also calls - `StorageConfig.initialize` at module import time (with a - different `texera/password` user that works for it because no - Scala writes first in the `pyamber` job where it runs). pytest - imports every test module during collection, even ones whose - tests will be deselected by `-m integration`, so that - initialization happens here too. We force-reset the singletons - and re-init with the prod-correct credentials; safe because - test_iceberg_document's tests are deselected from this run. - All catalog + S3 settings read the same `STORAGE_*` env vars - the production code consumes (via storage.conf), so the test - matches whichever identity the Scala side uses in the same job - and stays aligned with the bucket / endpoint the workflow - provisions. Defaults mirror storage.conf so a local sbt run - without those vars exported still works. +@pytest.fixture(scope="module", autouse=True) +def sqlite_iceberg_catalog(): + """Inject a sqlite-backed SqlCatalog so the test runs without external + iceberg infra (postgres/minio). - Class-scoped so the reset + tempdir allocation happens once - per class; the two tests in this class share state through the - same StorageConfig singleton anyway. - """ - StorageConfig._initialized = False - IcebergCatalogInstance._instance = None - large_binaries_bucket = os.environ.get( - "STORAGE_S3_LARGE_BINARIES_BUCKET", "texera-large-binaries" - ) + Module-scoped so all tests in this file share one warehouse, and so + namespace creation only happens once. We save/restore the original + `IcebergCatalogInstance` singleton so other test modules that expect + a real postgres-backed catalog (e.g. test_iceberg_document.py) are + not affected by our replacement. + """ + # Some other test module may have initialized StorageConfig already + # (it has a single-init lock). If nothing has initialized it yet, + # do it here with arbitrary values -- we replace the catalog + # instance below so the postgres/rest fields are never exercised. + if not StorageConfig._initialized: StorageConfig.initialize( catalog_type="postgres", - postgres_uri_without_scheme=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME", - "localhost:5432/texera_iceberg_catalog", - ), - postgres_username=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME", "postgres" - ), - postgres_password=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD", "postgres" - ), - rest_catalog_uri="http://localhost:8181/catalog/", - rest_catalog_warehouse_name="texera", + postgres_uri_without_scheme="unused", + postgres_username="unused", + postgres_password="unused", + rest_catalog_uri="unused", + rest_catalog_warehouse_name="unused", table_result_namespace="operator-port-result", table_state_namespace="operator-port-state", - directory_path=tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-"), + directory_path=_WAREHOUSE_DIR, commit_batch_size=4096, - s3_endpoint=os.environ.get("STORAGE_S3_ENDPOINT", "http://localhost:9000"), - s3_region=os.environ.get("STORAGE_S3_REGION", "us-west-2"), - s3_auth_username=os.environ.get("STORAGE_S3_AUTH_USERNAME", "texera_minio"), - s3_auth_password=os.environ.get("STORAGE_S3_AUTH_PASSWORD", "password"), - s3_large_binaries_base_uri=f"s3://{large_binaries_bucket}/objects/0/", + s3_endpoint="unused", + s3_region="unused", + s3_auth_username="unused", + s3_auth_password="unused", + s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/", ) - @pytest.fixture - def base_uri(self) -> str: - """A unique port-base URI per test so tables don't collide.""" - return VFSURIFactory.create_port_base_uri( - WorkflowIdentity(id=0), - ExecutionIdentity(id=0), - GlobalPortIdentity( - op_id=PhysicalOpIdentity( - logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), - layer_name="main", - ), - port_id=PortIdentity(id=0, internal=False), - input=False, + original_instance = IcebergCatalogInstance._instance + db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite" + catalog = SqlCatalog( + "texera_iceberg_e2e", + **{ + "uri": f"sqlite:///{db_path}", + "warehouse": f"file://{_WAREHOUSE_DIR}", + }, + ) + # Adopt whatever namespaces StorageConfig already has -- those are + # the ones DocumentFactory will route into. + catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE) + catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE) + IcebergCatalogInstance.replace_instance(catalog) + try: + yield catalog + finally: + IcebergCatalogInstance.replace_instance(original_instance) + + +def _fresh_base_uri() -> str: + """A unique port-base URI per test so tables don't collide.""" + return VFSURIFactory.create_port_base_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + GlobalPortIdentity( + op_id=PhysicalOpIdentity( + logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), + layer_name="main", ), - ) + port_id=PortIdentity(id=0, internal=False), + input=False, + ), + ) - @pytest.fixture - def producer(self, base_uri): - """An OutputManager wired to the iceberg result + state documents - at `base_uri`. Closes its writer threads on teardown so cached - buffers are flushed even if a test errors out before - `close_port_storage_writers()`. - """ - # RegionExecutionCoordinator's responsibility in prod: provision - # result + state documents at the port base URI before any - # worker starts. We emulate that here. - DocumentFactory.create_document( - VFSURIFactory.result_uri(base_uri), State.SCHEMA - ) - DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - mgr = OutputManager(worker_id="Worker:WF0-test-producer-main-0") - mgr.add_output_port( - PortIdentity(id=0, internal=False), - schema=State.SCHEMA, - storage_uri_base=base_uri, - ) - try: - yield mgr - finally: - # close_port_storage_writers is idempotent — fine to call - # again here if the test already closed. - try: - mgr.close_port_storage_writers() - except Exception: - pass +def test_state_written_by_output_manager_is_replayed_by_reader(): + """Producer side writes a state via OutputManager; consumer side reads + it via InputPortMaterializationReaderRunnable. The state must arrive + on the consumer's input queue intact. + """ + base_uri = _fresh_base_uri() + port_id = PortIdentity(id=0, internal=False) + worker_schema_for_result = State.SCHEMA # producer-side: only state matters - def test_state_written_by_output_manager_is_replayed_by_reader( - self, base_uri, producer - ): - """Producer side writes a state via OutputManager; consumer side - reads it via InputPortMaterializationReaderRunnable. The state - must arrive on the consumer's input queue intact. - """ - # Drive a state through the producer-side path. - state = State({"flag": True, "loop_counter": 7, "name": "outer"}) - producer.save_state_to_storage_if_needed(state) + # 1. RegionExecutionCoordinator's responsibility: provision result + + # state documents at the port base URI before any worker starts. + # We emulate that here. + DocumentFactory.create_document( + VFSURIFactory.result_uri(base_uri), worker_schema_for_result + ) + DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - # Force the writer threads to flush + commit by closing them. - # Without this, the iceberg buffer holds the state in memory - # and nothing is durable yet. - producer.close_port_storage_writers() + # 2. Producer side: spin up an OutputManager, set up real state + + # result writer threads against the iceberg storage. + producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0") + producer.add_output_port( + port_id, schema=worker_schema_for_result, storage_uri_base=base_uri + ) - # Consumer side: spin up the materialization reader against the - # same base URI. Each reader needs a partitioning even when no - # real downstream worker exists — supply a OneToOnePartitioning - # whose only receiver is the consumer worker itself. - consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") - consumer_queue = InternalQueue() - partitioning = Partitioning( - one_to_one_partitioning=OneToOnePartitioning( - batch_size=400, - channels=[ - ChannelIdentity( - from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), - to_worker_id=consumer_worker, - is_control=False, - ) - ], - ) - ) - reader = InputPortMaterializationReaderRunnable( - uri=base_uri, - queue=consumer_queue, - worker_actor_id=consumer_worker, - partitioning=partitioning, + # 3. Drive a state through the producer-side path. loop_counter rides + # alongside the State (not inside it) and is materialized as its own column. + state = State({"flag": True, "name": "outer"}) + producer.save_state_to_storage_if_needed(state, 7) + + # 4. Force the writer threads to flush + commit by closing them. + # Without this, the iceberg buffer holds the state in memory and + # nothing is durable yet. + producer.close_port_storage_writers() + + # 5. Consumer side: spin up the materialization reader against the + # same base URI. Each reader needs a partitioning even when no real + # downstream worker exists -- supply a OneToOnePartitioning whose + # only receiver is the consumer worker itself. + consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") + consumer_queue = InternalQueue() + partitioning = Partitioning( + one_to_one_partitioning=OneToOnePartitioning( + batch_size=400, + channels=[ + ChannelIdentity( + from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), + to_worker_id=consumer_worker, + is_control=False, + ) + ], ) + ) + reader = InputPortMaterializationReaderRunnable( + uri=base_uri, + queue=consumer_queue, + worker_actor_id=consumer_worker, + partitioning=partitioning, + ) - # Run the reader on a worker thread so we can time out cleanly - # if something goes wrong. - reader_thread = threading.Thread(target=reader.run, daemon=True) - reader_thread.start() - reader_thread.join(timeout=30) - assert not reader_thread.is_alive(), "reader did not finish within timeout" - assert reader.finished(), "reader exited but did not mark itself finished" + # Run the reader on a worker thread so we can time out cleanly if + # something goes wrong. + reader_thread = threading.Thread(target=reader.run, daemon=True) + reader_thread.start() + reader_thread.join(timeout=30) + assert not reader_thread.is_alive(), "reader did not finish within timeout" + assert reader.finished(), "reader exited but did not mark itself finished" - # Drain the consumer's queue and find the StateFrame(s). - state_frames: list[State] = [] - while not consumer_queue.is_empty(): - elem = consumer_queue.get() - if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): - state_frames.append(elem.payload.frame) + # 6. Drain the consumer's queue and find the StateFrame(s). + state_frames: list[StateFrame] = [] + while not consumer_queue.is_empty(): + elem = consumer_queue.get() + if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): + state_frames.append(elem.payload) - assert len(state_frames) == 1, ( - f"expected exactly one State to flow through writer→iceberg→reader; " - f"got {len(state_frames)}: {state_frames}" - ) - assert state_frames[0] == state, ( - f"replayed state did not match what was written; " - f"wrote={state}, read={state_frames[0]}" - ) + assert len(state_frames) == 1, ( + f"expected exactly one State to flow through writer→iceberg→reader; " + f"got {len(state_frames)}: {state_frames}" + ) + assert state_frames[0].frame == state, ( + f"replayed state did not match what was written; " + f"wrote={state}, read={state_frames[0].frame}" + ) + assert state_frames[0].loop_counter == 7, ( + f"loop_counter must round-trip through its own column; " + f"got {state_frames[0].loop_counter}" + ) - def test_state_table_persists_across_writer_close(self, base_uri, producer): - """Independently verify the iceberg state table contains the row. - If this passes but the reader test above fails, the bug is in - the reader / consumer wiring; if this fails, the bug is in the - writer / storage layer. - """ - state = State({"flag": False, "checkpoint": 42}) - producer.save_state_to_storage_if_needed(state) - producer.close_port_storage_writers() - # Read directly from the iceberg state document, bypassing the - # reader. - state_document, _ = DocumentFactory.open_document( - VFSURIFactory.state_uri(base_uri) - ) - rows = list(state_document.get()) - assert len(rows) == 1, ( - f"expected exactly one row in the iceberg state table after " - f"the writer was closed; got {len(rows)} rows" - ) - assert State.from_tuple(rows[0]) == state +def test_state_table_persists_across_writer_close(): + """Independently verify the iceberg state table contains the row. + If this passes but the reader test above fails, the bug is in the + reader / consumer wiring; if this fails, the bug is in the writer / + storage layer. + """ + base_uri = _fresh_base_uri() + port_id = PortIdentity(id=0, internal=False) + + DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), State.SCHEMA) + DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) + + producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0") + producer.add_output_port(port_id, schema=State.SCHEMA, storage_uri_base=base_uri) + + state = State({"flag": False, "checkpoint": 42}) + producer.save_state_to_storage_if_needed(state, 0) + producer.close_port_storage_writers() + + # Read directly from the iceberg state document, bypassing the reader. + state_document, _ = DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri)) + rows = list(state_document.get()) + assert len(rows) == 1, ( + f"expected exactly one row in the iceberg state table after the " + f"writer was closed; got {len(rows)} rows" + ) + assert State.from_tuple(rows[0]) == state + assert rows[0][State.LOOP_COUNTER] == 0 diff --git a/amber/src/test/python/core/models/test_state.py b/amber/src/test/python/core/models/test_state.py index aef2297130b..b51a01267bb 100644 --- a/amber/src/test/python/core/models/test_state.py +++ b/amber/src/test/python/core/models/test_state.py @@ -29,7 +29,17 @@ def test_state_subclasses_dict(self): def test_class_attributes(self): assert State.CONTENT == "content" - assert State.SCHEMA.get_attr_names() == ["content"] + assert State.LOOP_COUNTER == "loop_counter" + assert State.LOOP_START_ID == "loop_start_id" + assert State.LOOP_START_STATE_URI == "loop_start_state_uri" + # The loop-control columns are runtime-owned bookkeeping, sibling to + # content, not part of the user state JSON. + assert State.SCHEMA.get_attr_names() == [ + "content", + "loop_counter", + "loop_start_id", + "loop_start_state_uri", + ] def test_json_round_trip_primitives(self): original = State( @@ -83,14 +93,20 @@ class Custom: State({"bad": Custom()}).to_json() def test_tuple_round_trip(self): - original = State({"loop_counter": 3, "label": "outer", "blob": b"\x01\x02"}) + original = State({"i": 3, "label": "outer", "blob": b"\x01\x02"}) decoded = State.from_tuple(original.to_tuple()) assert decoded == original - def test_to_tuple_uses_state_schema(self): - tuple_ = State({"x": 1}).to_tuple() - # Single STRING column whose value is the JSON serialization. + def test_to_tuple_writes_content_and_loop_counter_columns(self): + tuple_ = State({"x": 1}).to_tuple(7) + # content holds the JSON serialization; loop_counter is its own column. assert tuple_[State.CONTENT] == '{"x":1}' + assert tuple_[State.LOOP_COUNTER] == 7 + # loop_counter must not leak into the content JSON. + assert "loop_counter" not in tuple_[State.CONTENT] + + def test_to_tuple_defaults_loop_counter_to_zero(self): + assert State({"x": 1}).to_tuple()[State.LOOP_COUNTER] == 0 def test_nested_dict_decodes_to_plain_dict(self): # Top-level returns a State; nested dicts come back as plain dict. diff --git a/amber/src/test/python/core/runnables/test_network_receiver.py b/amber/src/test/python/core/runnables/test_network_receiver.py index bf890e4a2f0..49ba2408efa 100644 --- a/amber/src/test/python/core/runnables/test_network_receiver.py +++ b/amber/src/test/python/core/runnables/test_network_receiver.py @@ -152,16 +152,19 @@ def test_network_receiver_can_receive_consecutive_state_messages( worker_id = ActorVirtualIdentity(name="test") channel_id = ChannelIdentity(worker_id, worker_id, False) + # loop_counter rides the StateFrame envelope (its own Arrow column), + # not the user content. Use a non-zero counter so the round-trip + # actually exercises the second column over the sender->receiver wire. input_queue.put( DataElement( tag=channel_id, - payload=StateFrame(State({"loop_counter": 0, "i": 1})), + payload=StateFrame(State({"i": 1}), loop_counter=0), ) ) input_queue.put( DataElement( tag=channel_id, - payload=StateFrame(State({"loop_counter": 1, "i": 2})), + payload=StateFrame(State({"i": 2}), loop_counter=5), ) ) @@ -169,11 +172,13 @@ def test_network_receiver_can_receive_consecutive_state_messages( second_element: DataElement = output_queue.get() assert isinstance(first_element.payload, StateFrame) - assert first_element.payload.frame == {"loop_counter": 0, "i": 1} + assert first_element.payload.frame == {"i": 1} + assert first_element.payload.loop_counter == 0 assert first_element.tag == channel_id assert isinstance(second_element.payload, StateFrame) - assert second_element.payload.frame == {"loop_counter": 1, "i": 2} + assert second_element.payload.frame == {"i": 2} + assert second_element.payload.loop_counter == 5 assert second_element.tag == channel_id @pytest.mark.timeout(10) diff --git a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py index 5016c2df2f1..59635d0e872 100644 --- a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py +++ b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py @@ -60,16 +60,27 @@ def runnable(self, me): return instance def test_state_rows_are_emitted_as_state_frames(self, runnable): - state_a = State({"loop_counter": 0}) - state_b = State({"loop_counter": 1}) + state_a = State({"i": 0}) + state_b = State({"i": 1}) - # The state document yields opaque tuples; from_tuple deserializes - # them. Patch from_tuple so we don't have to wire a real - # serialization. + # The state document yields opaque multi-column tuples. State.from_tuple + # (patched) deserializes the content column; the reader reads the + # loop-control columns directly off the row and carries them onto the + # emitted StateFrame envelope. + row_a = { + State.LOOP_COUNTER: 0, + State.LOOP_START_ID: "loop-a", + State.LOOP_START_STATE_URI: "vfs:///a", + } + row_b = { + State.LOOP_COUNTER: 1, + State.LOOP_START_ID: "loop-b", + State.LOOP_START_STATE_URI: "vfs:///b", + } result_doc = MagicMock() result_doc.get.return_value = iter([]) # No materialized tuples. state_doc = MagicMock() - state_doc.get.return_value = iter(["row-a", "row-b"]) + state_doc.get.return_value = iter([row_a, row_b]) with ( patch( @@ -96,4 +107,10 @@ def test_state_rows_are_emitted_as_state_frames(self, runnable): and isinstance(call.args[0].payload, StateFrame) ] assert [sf.payload.frame for sf in state_frames] == [state_a, state_b] + assert [sf.payload.loop_counter for sf in state_frames] == [0, 1] + assert [sf.payload.loop_start_id for sf in state_frames] == ["loop-a", "loop-b"] + assert [sf.payload.loop_start_state_uri for sf in state_frames] == [ + "vfs:///a", + "vfs:///b", + ] assert runnable._finished is True diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index ba146f1d57c..92103477d1d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -31,18 +31,35 @@ final case class State(values: Map[String, Any]) { def toJson: String = objectMapper.writeValueAsString(State.toJsonValue(values)) - def toTuple: Tuple = - Tuple.builder(State.schema).addSequentially(Array(toJson)).build() + def toTuple( + loopCounter: Long = 0L, + loopStartId: String = "", + loopStartStateUri: String = "" + ): Tuple = + Tuple + .builder(State.schema) + .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId, loopStartStateUri)) + .build() } object State { private val Content = "content" + // loop-control bookkeeping owned by the (Python) worker runtime; not user + // state and never in the content JSON. Materialized as its own columns, + // parallel to content. Scala never originates loop state (loop operators are + // Python-only), so toTuple defaults these to the "no loop" values. + private val LoopCounter = "loop_counter" + private val LoopStartId = "loop_start_id" + private val LoopStartStateUri = "loop_start_state_uri" private val BytesTypeMarker = "__texera_type__" private val BytesValue = "bytes" private val PayloadMarker = "payload" val schema: Schema = new Schema( - new Attribute(Content, AttributeType.STRING) + new Attribute(Content, AttributeType.STRING), + new Attribute(LoopCounter, AttributeType.LONG), + new Attribute(LoopStartId, AttributeType.STRING), + new Attribute(LoopStartStateUri, AttributeType.STRING) ) def fromJson(payload: String): State = diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala index 976a585e31a..1e6a0dc0178 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala @@ -98,13 +98,13 @@ class StateSpec extends AnyFlatSpec { it should "tuple-round-trip" in { val original = State( Map( - "loop_counter" -> 3L, + "i" -> 3L, "label" -> "outer", "blob" -> Array[Byte](1, 2) ) ) - val decoded = State.fromTuple(original.toTuple) - assert(decoded.values("loop_counter") == 3L) + val decoded = State.fromTuple(original.toTuple()) + assert(decoded.values("i") == 3L) assert(decoded.values("label") == "outer") assert( decoded.values("blob").asInstanceOf[Array[Byte]].sameElements(Array[Byte](1, 2)) @@ -112,7 +112,7 @@ class StateSpec extends AnyFlatSpec { } it should "produce a tuple whose payload is the JSON serialization" in { - val tuple = State(Map("x" -> 1L)).toTuple + val tuple = State(Map("x" -> 1L)).toTuple() assert(tuple.getSchema == State.schema) assert(tuple.getField[String]("content") == """{"x":1}""") } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index d21644f6e64..8f52b0eceec 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -211,7 +211,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] val state = State( Map( - "loop_counter" -> 3, + "i" -> 3, "name" -> "outer-loop", "payload" -> Array[Byte](0, 1, 2, 3), "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3)) @@ -220,13 +220,20 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - writer.putOne(state.toTuple) + writer.putOne( + state.toTuple(loopCounter = 7L, loopStartId = "ls", loopStartStateUri = "vfs:///outer") + ) writer.close() val storedRows = stateDocument.get().toList assert(storedRows.length == 1) + // Loop bookkeeping is materialized as its own columns, not in the content JSON. + assert(storedRows.head.getField[java.lang.Long]("loop_counter").toLong == 7L) + assert(storedRows.head.getField[String]("loop_start_id") == "ls") + assert(storedRows.head.getField[String]("loop_start_state_uri") == "vfs:///outer") + // User state round-trips through the content column (fromTuple reads only content). val deserialized = State.fromTuple(storedRows.head).values - assert(deserialized("loop_counter") == 3L) + assert(deserialized("i") == 3L) assert(deserialized("name") == "outer-loop") assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0, 1, 2, 3))) assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == true) @@ -238,45 +245,45 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter DocumentFactory.createDocument(stateUri, State.schema) val stateDocument = DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] - val states: List[State] = List( - State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 3))), - State( - Map( - "loop_counter" -> 1, - "i" -> 2, - "payload" -> Array[Byte](4, 5, 6), - "nested" -> Map("values" -> List(3, 4)) - ) + // (user state, loopCounter) -- the counter is written to its own column. + val states: List[(State, Long)] = List( + (State(Map("i" -> 1, "payload" -> Array[Byte](1, 2, 3))), 0L), + ( + State( + Map( + "i" -> 2, + "payload" -> Array[Byte](4, 5, 6), + "nested" -> Map("values" -> List(3, 4)) + ) + ), + 1L ) ) val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - states.foreach(state => writer.putOne(state.toTuple)) + states.foreach { case (state, loopCounter) => writer.putOne(state.toTuple(loopCounter)) } writer.close() - val deserializedStates = - stateDocument - .get() - .toList - .map(State.fromTuple) - .sortBy(_.values("loop_counter").asInstanceOf[Long]) - assert(deserializedStates.length == states.length) - deserializedStates.zip(states).foreach { - case (actual, expected) => - assert( - actual.values("loop_counter") == expected.values("loop_counter").asInstanceOf[Int].toLong - ) - assert(actual.values("i") == expected.values("i").asInstanceOf[Int].toLong) + val storedRows = + stateDocument.get().toList.sortBy(_.getField[java.lang.Long]("loop_counter").toLong) + assert(storedRows.length == states.length) + storedRows.zip(states).foreach { + case (row, (expectedState, expectedLoopCounter)) => + // loop_counter is its own column... + assert(row.getField[java.lang.Long]("loop_counter").toLong == expectedLoopCounter) + // ...and the user state round-trips through the content column. + val actual = State.fromTuple(row).values + assert(actual("i") == expectedState.values("i").asInstanceOf[Int].toLong) assert( - actual - .values("payload") + actual("payload") .asInstanceOf[Array[Byte]] - .sameElements(expected.values("payload").asInstanceOf[Array[Byte]]) + .sameElements(expectedState.values("payload").asInstanceOf[Array[Byte]]) ) } assert( - deserializedStates(1) + State + .fromTuple(storedRows(1)) .values("nested") .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L) ) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index 62b10a66864..486deefbcc2 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -19,8 +19,11 @@ package org.apache.texera.amber.util +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} import org.apache.texera.amber.core.tuple.AttributeTypeUtils.AttributeTypeException import org.scalatest.flatspec.AnyFlatSpec @@ -282,4 +285,48 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { any.getMetadata.get("texera_type") shouldBe "ANY" Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false } + + // ----- Tuple <-> Arrow data round-trip (the State wire-hop contract) ----- + + "tuple round-trip through Arrow vectors" should "preserve every column of a multi-column State tuple" in { + // The Python<->Scala state wire hop goes Tuple -> setTexeraTuple -> Arrow + // (PythonProxyClient.writeArrowStream) on one side and + // Arrow -> getTexeraTuple -> Tuple (PythonProxyServer) on the other. + // The schema-only round-trip tests above don't exercise the per-row data + // encode/decode, so a column dropped or mistyped there would slip through. + // Pin that the full multi-column State tuple (content STRING + the + // loop-control columns loop_counter LONG, loop_start_id / loop_start_state_uri + // STRING) survives a real setTexeraTuple -> Arrow vectors -> getTexeraTuple + // round-trip with every column intact -- the property the wire hop relies on. + val original = + State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop", "vfs:///outer") + + val allocator = new RootAllocator() + val root = VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(original.getSchema), allocator) + try { + root.allocateNew() + ArrowUtils.setTexeraTuple(original, 0, root) + root.setRowCount(1) + + val recovered = ArrowUtils.getTexeraTuple(0, root) + + // Every column survives the encode/decode, with names and types intact. + recovered.getSchema.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe + List( + ("content", AttributeType.STRING), + ("loop_counter", AttributeType.LONG), + ("loop_start_id", AttributeType.STRING), + ("loop_start_state_uri", AttributeType.STRING) + ) + // content (the user State JSON) round-trips... + State.fromTuple(recovered).values shouldBe Map("i" -> 5L, "label" -> "outer") + // ...and so do the loop-control columns. + recovered.getField[java.lang.Long]("loop_counter").toLong shouldBe 3L + recovered.getField[String]("loop_start_id") shouldBe "outer-loop" + recovered.getField[String]("loop_start_state_uri") shouldBe "vfs:///outer" + } finally { + root.close() + allocator.close() + } + } }