diff --git a/ci/docker/python-wheel-windows-vs2022-base.dockerfile b/ci/docker/python-wheel-windows-vs2022-base.dockerfile index f1bc314d013a..56177c65eab5 100644 --- a/ci/docker/python-wheel-windows-vs2022-base.dockerfile +++ b/ci/docker/python-wheel-windows-vs2022-base.dockerfile @@ -128,6 +128,7 @@ ENV CMAKE_BUILD_TYPE=${build_type} ` VCPKG_DEFAULT_TRIPLET=amd64-windows-static-md-${build_type} ` VCPKG_FEATURE_FLAGS="manifests" COPY ci/vcpkg/vcpkg.json arrow/ci/vcpkg/ + # cannot use the S3 feature here because while aws-sdk-cpp=1.9.160 contains # ssl related fixes as well as we can patch the vcpkg portfile to support # arm machines it hits ARROW-15141 where we would need to fall back to 1.8.186 diff --git a/ci/vcpkg/vcpkg.patch b/ci/vcpkg/vcpkg.patch index a4c8d5209785..5a7d7b279554 100644 --- a/ci/vcpkg/vcpkg.patch +++ b/ci/vcpkg/vcpkg.patch @@ -1,3 +1,14 @@ +diff --git a/scripts/toolchains/windows.cmake b/scripts/toolchains/windows.cmake +index 3cc90cc..36af495 100644 +--- a/scripts/toolchains/windows.cmake ++++ b/scripts/toolchains/windows.cmake +@@ -88,4 +88,4 @@ if(NOT _VCPKG_WINDOWS_TOOLCHAIN) + set(CMAKE_C_FLAGS_DEBUG "${VCPKG_CRT_LINK_FLAG_PREFIX}d /Z7 /Ob0 /Od /RTC1 ${VCPKG_C_FLAGS_DEBUG}" CACHE STRING "") +- set(CMAKE_CXX_FLAGS_RELEASE "${VCPKG_CRT_LINK_FLAG_PREFIX} /O2 /Oi /Gy /DNDEBUG /Z7 ${VCPKG_CXX_FLAGS_RELEASE}" CACHE STRING "") +- set(CMAKE_C_FLAGS_RELEASE "${VCPKG_CRT_LINK_FLAG_PREFIX} /O2 /Oi /Gy /DNDEBUG /Z7 ${VCPKG_C_FLAGS_RELEASE}" CACHE STRING "") ++ set(CMAKE_CXX_FLAGS_RELEASE "${VCPKG_CRT_LINK_FLAG_PREFIX} /O2 /Oi /Gy /DNDEBUG ${VCPKG_CXX_FLAGS_RELEASE}" CACHE STRING "") ++ set(CMAKE_C_FLAGS_RELEASE "${VCPKG_CRT_LINK_FLAG_PREFIX} /O2 /Oi /Gy /DNDEBUG ${VCPKG_C_FLAGS_RELEASE}" CACHE STRING "") + diff --git a/scripts/cmake/vcpkg_execute_build_process.cmake b/scripts/cmake/vcpkg_execute_build_process.cmake index 60fd5b587a..c8dc021af8 100644 --- a/scripts/cmake/vcpkg_execute_build_process.cmake diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_nested.cc b/cpp/src/arrow/compute/kernels/scalar_cast_nested.cc index 3ab42d89b6ee..392fd9fbb705 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cast_nested.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cast_nested.cc @@ -346,11 +346,13 @@ struct CastStruct { for (int out_field_index = 0; out_field_index < out_field_count; ++out_field_index) { const auto& out_field = out_type.field(out_field_index); - // Take the first field with matching name, if any. Extract it from the map so it - // can't be reused. - auto maybe_in_field_index = in_fields.extract(out_field->name()); - if (!maybe_in_field_index.empty()) { - fields_to_select[out_field_index] = maybe_in_field_index.mapped(); + // Take the first field with matching name, if any. Erase it from the map so it + // can't be reused. Use lower_bound (which guarantees first-match) instead of + // find/extract (which do not guarantee first for duplicate keys). + auto it = in_fields.lower_bound(out_field->name()); + if (it != in_fields.end() && it->first == out_field->name()) { + fields_to_select[out_field_index] = it->second; + in_fields.erase(it); } else if (out_field->nullable()) { fields_to_select[out_field_index] = kFillNullSentinel; } else { diff --git a/cpp/src/arrow/extension/fixed_shape_tensor.cc b/cpp/src/arrow/extension/fixed_shape_tensor.cc index 5be855ffcb1d..544616988746 100644 --- a/cpp/src/arrow/extension/fixed_shape_tensor.cc +++ b/cpp/src/arrow/extension/fixed_shape_tensor.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -109,8 +110,8 @@ Result> FixedShapeTensorType::Deserialize( return Status::Invalid("Expected FixedSizeList storage type, got ", storage_type->ToString()); } - auto value_type = - internal::checked_pointer_cast(storage_type)->value_type(); + auto fsl_type = internal::checked_pointer_cast(storage_type); + auto value_type = fsl_type->value_type(); rj::Document document; if (document.Parse(serialized_data.data(), serialized_data.length()).HasParseError() || !document.IsObject() || !document.HasMember("shape") || @@ -119,21 +120,45 @@ Result> FixedShapeTensorType::Deserialize( } std::vector shape; - for (auto& x : document["shape"].GetArray()) { + for (const auto& x : document["shape"].GetArray()) { + if (!x.IsInt64()) { + return Status::Invalid("shape must contain integers, got ", + internal::JsonTypeName(x)); + } shape.emplace_back(x.GetInt64()); } + std::vector permutation; if (document.HasMember("permutation")) { - for (auto& x : document["permutation"].GetArray()) { + const auto& json_permutation = document["permutation"]; + if (!json_permutation.IsArray()) { + return Status::Invalid("permutation must be an array, got ", + internal::JsonTypeName(json_permutation)); + } + for (const auto& x : json_permutation.GetArray()) { + if (!x.IsInt64()) { + return Status::Invalid("permutation must contain integers, got ", + internal::JsonTypeName(x)); + } permutation.emplace_back(x.GetInt64()); } if (shape.size() != permutation.size()) { return Status::Invalid("Invalid permutation"); } + RETURN_NOT_OK(internal::IsPermutationValid(permutation)); } std::vector dim_names; if (document.HasMember("dim_names")) { - for (auto& x : document["dim_names"].GetArray()) { + const auto& json_dim_names = document["dim_names"]; + if (!json_dim_names.IsArray()) { + return Status::Invalid("dim_names must be an array, got ", + internal::JsonTypeName(json_dim_names)); + } + for (const auto& x : json_dim_names.GetArray()) { + if (!x.IsString()) { + return Status::Invalid("dim_names must contain strings, got ", + internal::JsonTypeName(x)); + } dim_names.emplace_back(x.GetString()); } if (shape.size() != dim_names.size()) { @@ -141,7 +166,20 @@ Result> FixedShapeTensorType::Deserialize( } } - return fixed_shape_tensor(value_type, shape, permutation, dim_names); + // Validate product of shape dimensions matches storage type list_size. + // This check is intentionally after field parsing so that metadata-level errors + // (type mismatches, size mismatches) are reported first. + ARROW_ASSIGN_OR_RAISE(auto ext_type, FixedShapeTensorType::Make( + value_type, shape, permutation, dim_names)); + const auto& fst_type = internal::checked_cast(*ext_type); + ARROW_ASSIGN_OR_RAISE(const int64_t expected_size, + internal::ComputeShapeProduct(fst_type.shape())); + if (expected_size != fsl_type->list_size()) { + return Status::Invalid("Product of shape dimensions (", expected_size, + ") does not match FixedSizeList size (", fsl_type->list_size(), + ")"); + } + return ext_type; } std::shared_ptr FixedShapeTensorType::MakeArray( @@ -310,8 +348,7 @@ const Result> FixedShapeTensorArray::ToTensor() const { } std::vector shape = ext_type.shape(); - auto cell_size = std::accumulate(shape.begin(), shape.end(), static_cast(1), - std::multiplies<>()); + ARROW_ASSIGN_OR_RAISE(const int64_t cell_size, internal::ComputeShapeProduct(shape)); shape.insert(shape.begin(), 1, this->length()); internal::Permute(permutation, &shape); @@ -330,6 +367,11 @@ Result> FixedShapeTensorType::Make( const std::shared_ptr& value_type, const std::vector& shape, const std::vector& permutation, const std::vector& dim_names) { const size_t ndim = shape.size(); + for (auto dim : shape) { + if (dim < 0) { + return Status::Invalid("shape must have non-negative values, got ", dim); + } + } if (!permutation.empty() && ndim != permutation.size()) { return Status::Invalid("permutation size must match shape size. Expected: ", ndim, " Got: ", permutation.size()); @@ -342,8 +384,12 @@ Result> FixedShapeTensorType::Make( RETURN_NOT_OK(internal::IsPermutationValid(permutation)); } - const int64_t size = std::accumulate(shape.begin(), shape.end(), - static_cast(1), std::multiplies<>()); + ARROW_ASSIGN_OR_RAISE(const int64_t size, internal::ComputeShapeProduct(shape)); + if (size > std::numeric_limits::max()) { + return Status::Invalid("Product of shape dimensions (", size, + ") exceeds maximum FixedSizeList size (", + std::numeric_limits::max(), ")"); + } return std::make_shared(value_type, static_cast(size), shape, permutation, dim_names); } diff --git a/cpp/src/arrow/extension/tensor_extension_array_test.cc b/cpp/src/arrow/extension/tensor_extension_array_test.cc index 5c6dbe216281..531fc3c01cf5 100644 --- a/cpp/src/arrow/extension/tensor_extension_array_test.cc +++ b/cpp/src/arrow/extension/tensor_extension_array_test.cc @@ -219,6 +219,73 @@ TEST_F(TestFixedShapeTensorType, MetadataSerializationRoundtrip) { CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[3],"dim_names":["x","y"]})", "Invalid dim_names"); + + // Validate shape values must be integers. Error message should include the + // JSON type name of the offending value. + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[3.5,4]})", + "shape must contain integers, got Number"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":["3","4"]})", + "shape must contain integers, got String"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[null]})", + "shape must contain integers, got Null"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[true]})", + "shape must contain integers, got True"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[false]})", + "shape must contain integers, got False"); + + // Validate shape values must be non-negative + CheckDeserializationRaises(ext_type_, fixed_size_list(int64(), 1), R"({"shape":[-1]})", + "shape must have non-negative values"); + + // Validate product of shape matches storage list_size + CheckDeserializationRaises(ext_type_, storage_type, R"({"shape":[3,3]})", + "Product of shape dimensions"); + + // Validate permutation member must be an array with integer values + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":"invalid"})", + "permutation must be an array, got String"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":{"a":1}})", + "permutation must be an array, got Object"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":[1.5,0.5]})", + "permutation must contain integers, got Number"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":["a","b"]})", + "permutation must contain integers, got String"); + + // Validate permutation values must be unique integers in [0, N-1] + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":[0,0]})", + "Permutation indices"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":[0,5]})", + "Permutation indices"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"permutation":[-1,0]})", + "Permutation indices"); + + // Validate dim_names member must be an array with string values + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"dim_names":"invalid"})", + "dim_names must be an array, got String"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"dim_names":[1,2]})", + "dim_names must contain strings, got Number"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"shape":[3,4],"dim_names":[null,null]})", + "dim_names must contain strings, got Null"); +} + +TEST_F(TestFixedShapeTensorType, MakeValidatesShape) { + // Negative shape values should be rejected + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("shape must have non-negative values"), + FixedShapeTensorType::Make(value_type_, {-1})); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("shape must have non-negative values"), + FixedShapeTensorType::Make(value_type_, {3, -1, 4})); } TEST_F(TestFixedShapeTensorType, RoundtripBatch) { @@ -794,6 +861,32 @@ TEST_F(TestVariableShapeTensorType, MetadataSerializationRoundtrip) { "Invalid: permutation"); CheckDeserializationRaises(ext_type_, storage_type, R"({"dim_names":["x","y"]})", "Invalid: dim_names"); + + // Validate permutation member must be an array with integer values. Error + // message should include the JSON type name of the offending value. + CheckDeserializationRaises(ext_type_, storage_type, R"({"permutation":"invalid"})", + "permutation must be an array, got String"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"permutation":[1.5,0.5,2.5]})", + "permutation must contain integers, got Number"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"permutation":[null,null,null]})", + "permutation must contain integers, got Null"); + + // Validate dim_names member must be an array with string values + CheckDeserializationRaises(ext_type_, storage_type, R"({"dim_names":"invalid"})", + "dim_names must be an array, got String"); + CheckDeserializationRaises(ext_type_, storage_type, R"({"dim_names":[1,2,3]})", + "dim_names must contain strings, got Number"); + + // Validate uniform_shape member must be an array with integer-or-null values + CheckDeserializationRaises(ext_type_, storage_type, R"({"uniform_shape":"invalid"})", + "uniform_shape must be an array, got String"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"uniform_shape":[1.5,null,null]})", + "uniform_shape must contain integers or nulls, got Number"); + CheckDeserializationRaises(ext_type_, storage_type, + R"({"uniform_shape":["x",null,null]})", + "uniform_shape must contain integers or nulls, got String"); } TEST_F(TestVariableShapeTensorType, RoundtripBatch) { diff --git a/cpp/src/arrow/extension/tensor_internal.cc b/cpp/src/arrow/extension/tensor_internal.cc index 37862b7689f1..e94ea9a1d181 100644 --- a/cpp/src/arrow/extension/tensor_internal.cc +++ b/cpp/src/arrow/extension/tensor_internal.cc @@ -30,6 +30,31 @@ namespace arrow::internal { +namespace { + +// Names indexed by rapidjson::Type enum value: +// kNullType=0, kFalseType=1, kTrueType=2, kObjectType=3, +// kArrayType=4, kStringType=5, kNumberType=6. +constexpr const char* kJsonTypeNames[] = {"Null", "False", "True", "Object", + "Array", "String", "Number"}; + +} // namespace + +const char* JsonTypeName(const ::arrow::rapidjson::Value& v) { + return kJsonTypeNames[v.GetType()]; +} + +Result ComputeShapeProduct(std::span shape) { + int64_t product = 1; + for (const auto dim : shape) { + if (MultiplyWithOverflow(product, dim, &product)) { + return Status::Invalid( + "Product of tensor shape dimensions would not fit in 64-bit integer"); + } + } + return product; +} + bool IsPermutationTrivial(std::span permutation) { for (size_t i = 1; i < permutation.size(); ++i) { if (permutation[i - 1] + 1 != permutation[i]) { @@ -105,12 +130,7 @@ Result> SliceTensorBuffer(const Array& data_array, const DataType& value_type, std::span shape) { const int64_t byte_width = value_type.byte_width(); - int64_t size = 1; - for (const auto dim : shape) { - if (MultiplyWithOverflow(size, dim, &size)) { - return Status::Invalid("Tensor size would not fit in 64-bit integer"); - } - } + ARROW_ASSIGN_OR_RAISE(const int64_t size, ComputeShapeProduct(shape)); if (size != data_array.length()) { return Status::Invalid("Expected data array of length ", size, ", got ", data_array.length()); diff --git a/cpp/src/arrow/extension/tensor_internal.h b/cpp/src/arrow/extension/tensor_internal.h index b5ed5ebe1197..19665bf2cd4c 100644 --- a/cpp/src/arrow/extension/tensor_internal.h +++ b/cpp/src/arrow/extension/tensor_internal.h @@ -21,11 +21,25 @@ #include #include +#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep #include "arrow/result.h" #include "arrow/type_fwd.h" +#include + namespace arrow::internal { +/// \brief Return the name of a RapidJSON value's type (e.g., "Null", "Array", "Number"). +ARROW_EXPORT +const char* JsonTypeName(const ::arrow::rapidjson::Value& v); + +/// \brief Compute the product of the given shape dimensions. +/// +/// Returns Status::Invalid if the product would overflow int64_t. +/// An empty shape returns 1 (the multiplicative identity). +ARROW_EXPORT +Result ComputeShapeProduct(std::span shape); + ARROW_EXPORT bool IsPermutationTrivial(std::span permutation); diff --git a/cpp/src/arrow/extension/variable_shape_tensor.cc b/cpp/src/arrow/extension/variable_shape_tensor.cc index 7e27bbdb749f..b1b12583d7fe 100644 --- a/cpp/src/arrow/extension/variable_shape_tensor.cc +++ b/cpp/src/arrow/extension/variable_shape_tensor.cc @@ -159,26 +159,31 @@ Result> VariableShapeTensorType::Deserialize( if (document.HasMember("permutation")) { const auto& json_permutation = document["permutation"]; if (!json_permutation.IsArray()) { - return Status::Invalid("permutation must be an array"); + return Status::Invalid("permutation must be an array, got ", + internal::JsonTypeName(json_permutation)); } permutation.reserve(ndim); for (const auto& x : json_permutation.GetArray()) { if (!x.IsInt64()) { - return Status::Invalid("permutation must contain integers"); + return Status::Invalid("permutation must contain integers, got ", + internal::JsonTypeName(x)); } permutation.emplace_back(x.GetInt64()); } + RETURN_NOT_OK(internal::IsPermutationValid(permutation)); } std::vector dim_names; if (document.HasMember("dim_names")) { const auto& json_dim_names = document["dim_names"]; if (!json_dim_names.IsArray()) { - return Status::Invalid("dim_names must be an array"); + return Status::Invalid("dim_names must be an array, got ", + internal::JsonTypeName(json_dim_names)); } dim_names.reserve(ndim); for (const auto& x : json_dim_names.GetArray()) { if (!x.IsString()) { - return Status::Invalid("dim_names must contain strings"); + return Status::Invalid("dim_names must contain strings, got ", + internal::JsonTypeName(x)); } dim_names.emplace_back(x.GetString()); } @@ -188,7 +193,8 @@ Result> VariableShapeTensorType::Deserialize( if (document.HasMember("uniform_shape")) { const auto& json_uniform_shape = document["uniform_shape"]; if (!json_uniform_shape.IsArray()) { - return Status::Invalid("uniform_shape must be an array"); + return Status::Invalid("uniform_shape must be an array, got ", + internal::JsonTypeName(json_uniform_shape)); } uniform_shape.reserve(ndim); for (const auto& x : json_uniform_shape.GetArray()) { @@ -197,7 +203,8 @@ Result> VariableShapeTensorType::Deserialize( } else if (x.IsInt64()) { uniform_shape.emplace_back(x.GetInt64()); } else { - return Status::Invalid("uniform_shape must contain integers or nulls"); + return Status::Invalid("uniform_shape must contain integers or nulls, got ", + internal::JsonTypeName(x)); } } } diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index c21eb913c389..84ee62fe9e8d 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -423,10 +423,12 @@ static Result> ReadMessageInternal( body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); } - if (body->size() < decoder.next_required_size()) { - return Status::IOError("Expected to be able to read ", - decoder.next_required_size(), - " bytes for message body, got ", body->size()); + if (body->size() != decoder.next_required_size()) { + // The streaming decoder got out of sync with the actual advertised + // metadata and body size, which signals an invalid IPC file. + return Status::IOError("Invalid IPC file: advertised body size is ", body->size(), + ", but message decoder expects to read ", + decoder.next_required_size(), " bytes instead"); } RETURN_NOT_OK(decoder.Consume(body)); return result; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index b79fbf6dd712..580081384308 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -122,14 +122,14 @@ Status InvalidMessageType(MessageType expected, MessageType actual) { /// \brief Structure to keep common arguments to be passed struct IpcReadContext { - IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, bool swap, + IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, bool swap_endian, MetadataVersion version = MetadataVersion::V5, Compression::type kind = Compression::UNCOMPRESSED) : dictionary_memo(memo), options(option), metadata_version(version), compression(kind), - swap_endian(swap) {} + swap_endian(swap_endian) {} DictionaryMemo* dictionary_memo; @@ -589,6 +589,7 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op } AppendFrom(field->child_data); } + // Dictionary buffers are decompressed separately (see ReadDictionary). } BufferPtrVector Get(const ArrayDataVector& fields) && { @@ -613,16 +614,91 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op }); } +// Helper class to run post-ArrayLoader steps: +// buffer decompression, dictionary resolution, buffer re-alignment. +struct RecordBatchLoader { + Result> CreateRecordBatch(ArrayDataVector columns) { + ARROW_ASSIGN_OR_RAISE(auto filtered_columns, CreateColumns(std::move(columns))); + + std::shared_ptr filtered_schema; + if (!inclusion_mask_.empty()) { + FieldVector filtered_fields; + for (int i = 0; i < schema_->num_fields(); ++i) { + if (inclusion_mask_[i]) { + filtered_fields.push_back(schema_->field(i)); + } + } + filtered_schema = schema(std::move(filtered_fields), schema_->metadata()); + } else { + filtered_schema = schema_; + } + + return RecordBatch::Make(std::move(filtered_schema), batch_length_, + std::move(filtered_columns)); + } + + Result CreateColumns(ArrayDataVector columns, + bool resolve_dictionaries = true) { + if (resolve_dictionaries) { + // Dictionary resolution needs to happen on the unfiltered columns, + // because fields are mapped structurally (by path in the original schema). + RETURN_NOT_OK(ResolveDictionaries(columns, *context_.dictionary_memo, + context_.options.memory_pool)); + } + + ArrayDataVector filtered_columns; + if (!inclusion_mask_.empty()) { + FieldVector filtered_fields; + for (int i = 0; i < schema_->num_fields(); ++i) { + if (inclusion_mask_[i]) { + DCHECK_NE(columns[i], nullptr); + filtered_columns.push_back(std::move(columns[i])); + } + } + columns.clear(); + } else { + filtered_columns = std::move(columns); + } + + if (context_.compression != Compression::UNCOMPRESSED) { + RETURN_NOT_OK( + DecompressBuffers(context_.compression, context_.options, &filtered_columns)); + } + + // Swap endian if necessary + if (context_.swap_endian) { + for (auto& column : filtered_columns) { + ARROW_ASSIGN_OR_RAISE( + column, arrow::internal::SwapEndianArrayData(std::move(column), + context_.options.memory_pool)); + } + } + if (context_.options.ensure_alignment != Alignment::kAnyAlignment) { + for (auto& column : filtered_columns) { + ARROW_ASSIGN_OR_RAISE( + column, + util::EnsureAlignment( + std::move(column), + // The numerical value of the enum is taken literally as byte alignment + static_cast(context_.options.ensure_alignment), + context_.options.memory_pool)); + } + } + return filtered_columns; + } + + IpcReadContext context_; + std::shared_ptr schema_; + int64_t batch_length_; + std::vector inclusion_mask_; +}; + Result> LoadRecordBatchSubset( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, const std::vector* inclusion_mask, const IpcReadContext& context, io::RandomAccessFile* file) { ArrayLoader loader(metadata, context.metadata_version, context.options, file); - ArrayDataVector columns(schema->num_fields()); - ArrayDataVector filtered_columns; - FieldVector filtered_fields; - std::shared_ptr filtered_schema; for (int i = 0; i < schema->num_fields(); ++i) { const Field& field = *schema->field(i); @@ -634,10 +710,6 @@ Result> LoadRecordBatchSubset( return Status::IOError("Array length did not match record batch length"); } columns[i] = std::move(column); - if (inclusion_mask) { - filtered_columns.push_back(columns[i]); - filtered_fields.push_back(schema->field(i)); - } } else { // Skip field. This logic must be executed to advance the state of the // loader to the next field @@ -645,41 +717,9 @@ Result> LoadRecordBatchSubset( } } - // Dictionary resolution needs to happen on the unfiltered columns, - // because fields are mapped structurally (by path in the original schema). - RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, - context.options.memory_pool)); - - if (inclusion_mask) { - filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); - columns.clear(); - } else { - filtered_schema = schema; - filtered_columns = std::move(columns); - } - if (context.compression != Compression::UNCOMPRESSED) { - RETURN_NOT_OK( - DecompressBuffers(context.compression, context.options, &filtered_columns)); - } - - // swap endian in a set of ArrayData if necessary (swap_endian == true) - if (context.swap_endian) { - for (auto& filtered_column : filtered_columns) { - ARROW_ASSIGN_OR_RAISE(filtered_column, - arrow::internal::SwapEndianArrayData(filtered_column)); - } - } - auto batch = RecordBatch::Make(std::move(filtered_schema), metadata->length(), - std::move(filtered_columns)); - - if (ARROW_PREDICT_FALSE(context.options.ensure_alignment != Alignment::kAnyAlignment)) { - return util::EnsureAlignment(batch, - // the numerical value of ensure_alignment enum is taken - // literally as byte alignment - static_cast(context.options.ensure_alignment), - context.options.memory_pool); - } - return batch; + RecordBatchLoader batch_loader{context, schema, metadata->length(), + inclusion_mask ? *inclusion_mask : std::vector{}}; + return batch_loader.CreateRecordBatch(std::move(columns)); } Result> LoadRecordBatch( @@ -845,7 +885,7 @@ Status UnpackSchemaMessage(const Message& message, const IpcReadOptions& options out_schema, field_inclusion_mask, swap_endian); } -Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, +Status ReadDictionary(const Buffer& metadata, IpcReadContext context, DictionaryKind* kind, io::RandomAccessFile* file) { const flatbuf::Message* message = nullptr; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); @@ -860,13 +900,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data"); - Compression::type compression; - RETURN_NOT_OK(GetCompression(batch_meta, &compression)); - if (compression == Compression::UNCOMPRESSED && + RETURN_NOT_OK(GetCompression(batch_meta, &context.compression)); + if (context.compression == Compression::UNCOMPRESSED && message->version() == flatbuf::MetadataVersion::MetadataVersion_V4) { // Possibly obtain codec information from experimental serialization format // in 0.17.x - RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + RETURN_NOT_OK(GetCompressionExperimental(message, &context.compression)); } const int64_t id = dictionary_batch->id(); @@ -882,16 +921,14 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, const Field dummy_field("", value_type); RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get())); - if (compression != Compression::UNCOMPRESSED) { - ArrayDataVector dict_fields{dict_data}; - RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields)); - } - - // swap endian in dict_data if necessary (swap_endian == true) - if (context.swap_endian) { - ARROW_ASSIGN_OR_RAISE(dict_data, ::arrow::internal::SwapEndianArrayData( - dict_data, context.options.memory_pool)); - } + // Run post-load steps: buffer decompression, etc. + RecordBatchLoader batch_loader{context, /*schema=*/nullptr, batch_meta->length(), + /*inclusion_mask=*/std::vector{}}; + ARROW_ASSIGN_OR_RAISE( + auto dict_columns, + batch_loader.CreateColumns({dict_data}, /*resolve_dictionaries=*/false)); + DCHECK_EQ(dict_columns.size(), 1); + dict_data = dict_columns[0]; if (dictionary_batch->isDelta()) { if (kind != nullptr) { @@ -1756,10 +1793,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr out_schema; RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields, &inclusion_mask, &out_schema)); - for (int i = 0; i < schema->num_fields(); ++i) { const Field& field = *schema->field(i); - if (inclusion_mask.size() == 0 || inclusion_mask[i]) { + if (inclusion_mask.empty() || inclusion_mask[i]) { // Read field auto column = std::make_shared(); RETURN_NOT_OK(loader.Load(&field, column.get())); @@ -1767,21 +1803,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::IOError("Array length did not match record batch length"); } columns[i] = std::move(column); - if (inclusion_mask.size() > 0) { - filtered_columns.push_back(columns[i]); - filtered_fields.push_back(schema->field(i)); - } } else { // Skip field. This logic must be executed to advance the state of the // loader to the next field RETURN_NOT_OK(loader.SkipField(&field)); } } - if (inclusion_mask.size() > 0) { - filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); - } else { - filtered_schema = schema; - } return Status::OK(); } @@ -1798,31 +1825,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } loader.read_request().FulfillRequest(buffers); - // Dictionary resolution needs to happen on the unfiltered columns, - // because fields are mapped structurally (by path in the original schema). - RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, - context.options.memory_pool)); - if (inclusion_mask.size() > 0) { - columns.clear(); - } else { - filtered_columns = std::move(columns); - } - - if (context.compression != Compression::UNCOMPRESSED) { - RETURN_NOT_OK( - DecompressBuffers(context.compression, context.options, &filtered_columns)); - } - - // swap endian in a set of ArrayData if necessary (swap_endian == true) - if (context.swap_endian) { - for (int i = 0; i < static_cast(filtered_columns.size()); ++i) { - ARROW_ASSIGN_OR_RAISE(filtered_columns[i], - arrow::internal::SwapEndianArrayData( - filtered_columns[i], context.options.memory_pool)); - } - } - return RecordBatch::Make(std::move(filtered_schema), length, - std::move(filtered_columns)); + RecordBatchLoader batch_loader{context, schema, length, std::move(inclusion_mask)}; + return batch_loader.CreateRecordBatch(std::move(columns)); } std::shared_ptr schema; @@ -1834,9 +1838,6 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { ArrayDataVector columns; io::internal::ReadRangeCache cache; int64_t length; - ArrayDataVector filtered_columns; - FieldVector filtered_fields; - std::shared_ptr filtered_schema; std::vector inclusion_mask; }; diff --git a/dev/tasks/python-wheels/github.windows.yml b/dev/tasks/python-wheels/github.windows.yml index 77e2a04e3a0a..96605614e321 100644 --- a/dev/tasks/python-wheels/github.windows.yml +++ b/dev/tasks/python-wheels/github.windows.yml @@ -57,6 +57,28 @@ jobs: esac echo "TEST_IMAGE_PREFIX=${test_image_prefix}" >> ${GITHUB_ENV} + - name: Configure Docker data-root + shell: powershell + run: | + # The D: drive on windows-2022 GH Actions runners has ~44GB free vs ~14GB on C:. + # Moving Docker's data-root to D: prevents hcsshim::ImportLayer failures when + # building large Windows container layers (e.g. the vcpkg install layer). GH-49676 + Stop-Service docker + $daemonJson = "C:\ProgramData\Docker\config\daemon.json" + New-Item -ItemType Directory -Force -Path (Split-Path $daemonJson) | Out-Null + if (Test-Path $daemonJson) { + $json = Get-Content $daemonJson | ConvertFrom-Json + $json | Add-Member -Force -NotePropertyName "data-root" -NotePropertyValue "D:\docker" + $json | ConvertTo-Json -Depth 10 | Set-Content $daemonJson + } else { + Set-Content $daemonJson -Value '{"data-root":"D:\docker"}' + } + Start-Service docker + Write-Host "=== daemon.json ===" + Get-Content $daemonJson + Write-Host "=== docker info ===" + docker info + - name: Build wheel shell: cmd run: | diff --git a/testing b/testing index a871ddc17a4d..249079a810ca 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a871ddc17a4dd936b7aa43898d59f86a11c3a2b5 +Subproject commit 249079a810caedda6898464003c7ef8a47efeeae