diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala index a6f22085b69..c4f649e719e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala @@ -45,6 +45,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.language.implicitConversions object ArrowUtils extends LazyLogging { + private val TexeraTypeMetadataKey = "texera_type" // Create a single allocator for the entire utility private val allocator: BufferAllocator = new RootAllocator() @@ -94,7 +95,8 @@ object ArrowUtils extends LazyLogging { /** * Converts an Arrow Schema into Texera Schema. - * Checks field metadata to detect LARGE_BINARY types. + * Checks field metadata to recover types that share an Arrow representation + * (LARGE_BINARY and ANY both ride on Utf8). * * @param arrowSchema The Arrow Schema to be converted. * @return A Texera Schema. @@ -102,11 +104,14 @@ object ArrowUtils extends LazyLogging { def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema = Schema( arrowSchema.getFields.asScala.map { field => - val isLargeBinary = Option(field.getMetadata) - .exists(m => m.containsKey("texera_type") && m.get("texera_type") == "LARGE_BINARY") + val taggedType = Option(field.getMetadata) + .flatMap(m => Option(m.get(TexeraTypeMetadataKey))) + .collect { + case "LARGE_BINARY" => AttributeType.LARGE_BINARY + case "ANY" => AttributeType.ANY + } - val attributeType = - if (isLargeBinary) AttributeType.LARGE_BINARY else toAttributeType(field.getType) + val attributeType = taggedType.getOrElse(toAttributeType(field.getType)) new Attribute(field.getName, attributeType) }.toList ) @@ -232,16 +237,22 @@ object ArrowUtils extends LazyLogging { /** * Converts an Amber schema into Arrow schema. - * Stores AttributeType in field metadata to preserve LARGE_BINARY type information. + * Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY, + * which both collapse onto Utf8 in Arrow. * * @param schema The Texera Schema. * @return An Arrow Schema. */ def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = { val arrowFields = schema.getAttributes.map { attribute => - val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) { + val metadataTag = attribute.getType match { + case AttributeType.LARGE_BINARY => "LARGE_BINARY" + case AttributeType.ANY => "ANY" + case _ => null + } + val metadata = if (metadataTag != null) { val map = new util.HashMap[String, String]() - map.put("texera_type", "LARGE_BINARY") + map.put(TexeraTypeMetadataKey, metadataTag) map } else null diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index 69323acbbaa..212be040b12 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { ) } - it should "lose the ANY distinction (round-trips as STRING)" in { - // Pin: ANY fromAttributeType produces Utf8 with no metadata. toAttributeType - // then can only see Utf8, so the recovered type is STRING. Documenting this - // information loss so a future fix that round-trips ANY can break the spec. + it should "preserve ANY through the metadata-based path" in { val original = Schema(List(new Attribute("v", AttributeType.ANY))) val recovered = ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original)) recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe List( - ("v", AttributeType.STRING) + ("v", AttributeType.ANY) ) } + + it should "attach texera_type=ANY metadata to ANY fields and only those" in { + val schema = Schema( + List( + new Attribute("v", AttributeType.ANY), + new Attribute("name", AttributeType.STRING) + ) + ) + val arrow = ArrowUtils.fromTexeraSchema(schema) + val fields = arrow.getFields.asScala.toList + val any = fields.find(_.getName == "v").get + val name = fields.find(_.getName == "name").get + any.getMetadata.get("texera_type") shouldBe "ANY" + Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false + } }