Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.language.implicitConversions

object ArrowUtils extends LazyLogging {
private val TexeraTypeMetadataKey = "texera_type"

// Create a single allocator for the entire utility
private val allocator: BufferAllocator = new RootAllocator()
Expand Down Expand Up @@ -94,19 +95,23 @@ object ArrowUtils extends LazyLogging {

/**
* Converts an Arrow Schema into Texera Schema.
* Checks field metadata to detect LARGE_BINARY types.
* Checks field metadata to recover types that share an Arrow representation
* (LARGE_BINARY and ANY both ride on Utf8).
*
* @param arrowSchema The Arrow Schema to be converted.
* @return A Texera Schema.
*/
def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema =
Schema(
arrowSchema.getFields.asScala.map { field =>
val isLargeBinary = Option(field.getMetadata)
.exists(m => m.containsKey("texera_type") && m.get("texera_type") == "LARGE_BINARY")
val taggedType = Option(field.getMetadata)
.flatMap(m => Option(m.get(TexeraTypeMetadataKey)))
.collect {
case "LARGE_BINARY" => AttributeType.LARGE_BINARY
case "ANY" => AttributeType.ANY
}

val attributeType =
if (isLargeBinary) AttributeType.LARGE_BINARY else toAttributeType(field.getType)
val attributeType = taggedType.getOrElse(toAttributeType(field.getType))
new Attribute(field.getName, attributeType)
}.toList
)
Expand Down Expand Up @@ -232,16 +237,22 @@ object ArrowUtils extends LazyLogging {

/**
* Converts an Amber schema into Arrow schema.
* Stores AttributeType in field metadata to preserve LARGE_BINARY type information.
* Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY,
* which both collapse onto Utf8 in Arrow.
*
* @param schema The Texera Schema.
* @return An Arrow Schema.
*/
def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = {
val arrowFields = schema.getAttributes.map { attribute =>
val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) {
val metadataTag = attribute.getType match {
case AttributeType.LARGE_BINARY => "LARGE_BINARY"
case AttributeType.ANY => "ANY"
case _ => null
}
val metadata = if (metadataTag != null) {
val map = new util.HashMap[String, String]()
map.put("texera_type", "LARGE_BINARY")
map.put(TexeraTypeMetadataKey, metadataTag)
map
} else null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
)
}

it should "lose the ANY distinction (round-trips as STRING)" in {
// Pin: ANY fromAttributeType produces Utf8 with no metadata. toAttributeType
// then can only see Utf8, so the recovered type is STRING. Documenting this
// information loss so a future fix that round-trips ANY can break the spec.
it should "preserve ANY through the metadata-based path" in {
val original = Schema(List(new Attribute("v", AttributeType.ANY)))
val recovered = ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original))
recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe List(
("v", AttributeType.STRING)
("v", AttributeType.ANY)
)
}

it should "attach texera_type=ANY metadata to ANY fields and only those" in {
val schema = Schema(
List(
new Attribute("v", AttributeType.ANY),
new Attribute("name", AttributeType.STRING)
)
)
val arrow = ArrowUtils.fromTexeraSchema(schema)
val fields = arrow.getFields.asScala.toList
val any = fields.find(_.getName == "v").get
val name = fields.find(_.getName == "name").get
any.getMetadata.get("texera_type") shouldBe "ANY"
Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false
}
}
Loading