From edc311be108df9ca894ce755cf9d0713e37c85ed Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 8 May 2026 18:59:41 -0700 Subject: [PATCH 1/2] preserve ANY through schema round-trip via texera_type metadata --- .../apache/texera/amber/util/ArrowUtils.scala | 26 +++++++++++++------ .../texera/amber/util/ArrowUtilsSpec.scala | 22 ++++++++++++---- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala index a6f22085b69..6cc1d8b668c 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala @@ -94,7 +94,8 @@ object ArrowUtils extends LazyLogging { /** * Converts an Arrow Schema into Texera Schema. - * Checks field metadata to detect LARGE_BINARY types. + * Checks field metadata to recover types that share an Arrow representation + * (LARGE_BINARY and ANY both ride on Utf8). * * @param arrowSchema The Arrow Schema to be converted. * @return A Texera Schema. @@ -102,11 +103,14 @@ object ArrowUtils extends LazyLogging { def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema = Schema( arrowSchema.getFields.asScala.map { field => - val isLargeBinary = Option(field.getMetadata) - .exists(m => m.containsKey("texera_type") && m.get("texera_type") == "LARGE_BINARY") + val taggedType = Option(field.getMetadata) + .flatMap(m => Option(m.get("texera_type"))) + .collect { + case "LARGE_BINARY" => AttributeType.LARGE_BINARY + case "ANY" => AttributeType.ANY + } - val attributeType = - if (isLargeBinary) AttributeType.LARGE_BINARY else toAttributeType(field.getType) + val attributeType = taggedType.getOrElse(toAttributeType(field.getType)) new Attribute(field.getName, attributeType) }.toList ) @@ -232,16 +236,22 @@ object ArrowUtils extends LazyLogging { /** * Converts an Amber schema into Arrow schema. - * Stores AttributeType in field metadata to preserve LARGE_BINARY type information. + * Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY, + * which both collapse onto Utf8 in Arrow. * * @param schema The Texera Schema. * @return An Arrow Schema. */ def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = { val arrowFields = schema.getAttributes.map { attribute => - val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) { + val metadataTag = attribute.getType match { + case AttributeType.LARGE_BINARY => "LARGE_BINARY" + case AttributeType.ANY => "ANY" + case _ => null + } + val metadata = if (metadataTag != null) { val map = new util.HashMap[String, String]() - map.put("texera_type", "LARGE_BINARY") + map.put("texera_type", metadataTag) map } else null diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index 69323acbbaa..212be040b12 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { ) } - it should "lose the ANY distinction (round-trips as STRING)" in { - // Pin: ANY fromAttributeType produces Utf8 with no metadata. toAttributeType - // then can only see Utf8, so the recovered type is STRING. Documenting this - // information loss so a future fix that round-trips ANY can break the spec. + it should "preserve ANY through the metadata-based path" in { val original = Schema(List(new Attribute("v", AttributeType.ANY))) val recovered = ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original)) recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe List( - ("v", AttributeType.STRING) + ("v", AttributeType.ANY) ) } + + it should "attach texera_type=ANY metadata to ANY fields and only those" in { + val schema = Schema( + List( + new Attribute("v", AttributeType.ANY), + new Attribute("name", AttributeType.STRING) + ) + ) + val arrow = ArrowUtils.fromTexeraSchema(schema) + val fields = arrow.getFields.asScala.toList + val any = fields.find(_.getName == "v").get + val name = fields.find(_.getName == "name").get + any.getMetadata.get("texera_type") shouldBe "ANY" + Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false + } } From ade3f2a6f269d91497e750700a074ac77a45370c Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 12 May 2026 12:34:29 -0700 Subject: [PATCH 2/2] Make constant for texera_type --- .../main/scala/org/apache/texera/amber/util/ArrowUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala index 6cc1d8b668c..c4f649e719e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala @@ -45,6 +45,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.language.implicitConversions object ArrowUtils extends LazyLogging { + private val TexeraTypeMetadataKey = "texera_type" // Create a single allocator for the entire utility private val allocator: BufferAllocator = new RootAllocator() @@ -104,7 +105,7 @@ object ArrowUtils extends LazyLogging { Schema( arrowSchema.getFields.asScala.map { field => val taggedType = Option(field.getMetadata) - .flatMap(m => Option(m.get("texera_type"))) + .flatMap(m => Option(m.get(TexeraTypeMetadataKey))) .collect { case "LARGE_BINARY" => AttributeType.LARGE_BINARY case "ANY" => AttributeType.ANY @@ -251,7 +252,7 @@ object ArrowUtils extends LazyLogging { } val metadata = if (metadataTag != null) { val map = new util.HashMap[String, String]() - map.put("texera_type", metadataTag) + map.put(TexeraTypeMetadataKey, metadataTag) map } else null