Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 58 additions & 55 deletions src/duckdb/extension/parquet/decoder/delta_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &
}

void DeltaByteArrayDecoder::InitializePage() {
if (reader.Type().InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Byte Array encoding is only supported for string/blob data");
}
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
idx_t prefix_count, suffix_count;
Expand All @@ -33,71 +30,77 @@ void DeltaByteArrayDecoder::InitializePage() {
if (prefix_count != suffix_count) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix and suffix counts are different - corrupt file?");
}

auto prefix_data = reinterpret_cast<uint32_t *>(prefix_buffer.ptr);
auto suffix_data = reinterpret_cast<uint32_t *>(suffix_buffer.ptr);

// Allocate the plain data buffer
if (!plain_data) {
plain_data = make_shared_ptr<ResizeableBuffer>();
}
plain_data->reset();

if (prefix_count == 0) {
// no values
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, nullptr);
plain_data->resize(allocator, 0);
return;
}
auto prefix_data = reinterpret_cast<uint32_t *>(prefix_buffer.ptr);
auto suffix_data = reinterpret_cast<uint32_t *>(suffix_buffer.ptr);
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, prefix_count);
byte_array_count = prefix_count;
delta_offset = 0;
auto string_data = FlatVector::GetData<string_t>(*byte_array_data);

// Decode DELTA_BYTE_ARRAY into plain Parquet page format
// Plain format for BYTE_ARRAY: [4-byte length][data] repeated
// Plain format for FIXED_LEN_BYTE_ARRAY: [data] repeated (no length prefix)
auto &schema = reader.Schema();
bool is_fixed_len = (schema.parquet_type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY);
idx_t fixed_len = is_fixed_len ? schema.type_length : 0;

// Calculate total buffer size and max value length in one pass
idx_t total_size = 0;
idx_t max_len = 0;
for (idx_t i = 0; i < prefix_count; i++) {
auto str_len = prefix_data[i] + suffix_data[i];
block.available(suffix_data[i]);
string_data[i] = StringVector::EmptyString(*byte_array_data, str_len);
auto result_data = string_data[i].GetDataWriteable();
if (prefix_data[i] > 0) {
if (i == 0 || prefix_data[i] > string_data[i - 1].GetSize()) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix is out of range - corrupt file?");
}
memcpy(result_data, string_data[i - 1].GetData(), prefix_data[i]);
idx_t len = prefix_data[i] + suffix_data[i];
if (is_fixed_len && len != fixed_len) {
throw std::runtime_error(
"DELTA_BYTE_ARRAY on FIXED_LEN_BYTE_ARRAY: decoded length does not match type length");
}
memcpy(result_data + prefix_data[i], block.ptr, suffix_data[i]);
block.inc(suffix_data[i]);
string_data[i].Finalize();
total_size += len + (is_fixed_len ? 0 : sizeof(uint32_t));
max_len = MaxValue(max_len, len);
}
}

void DeltaByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
if (!byte_array_data) {
throw std::runtime_error("Internal error - DeltaByteArray called but there was no byte_array_data set");
}
auto result_ptr = FlatVector::GetData<string_t>(result);
auto &result_mask = FlatVector::Validity(result);
auto string_data = FlatVector::GetData<string_t>(*byte_array_data);
for (idx_t row_idx = 0; row_idx < read_count; row_idx++) {
if (defines && defines[row_idx + result_offset] != reader.MaxDefine()) {
result_mask.SetInvalid(row_idx + result_offset);
continue;
plain_data->resize(allocator, total_size);
unsafe_vector<uint8_t> prev_value(max_len);
idx_t prev_len = 0;

auto output = plain_data->ptr;
for (idx_t i = 0; i < prefix_count; i++) {
auto prefix_len = prefix_data[i];
auto suffix_len = suffix_data[i];
auto value_len = prefix_len + suffix_len;

if (prefix_len > prev_len) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix is out of range - corrupt file?");
}
if (delta_offset >= byte_array_count) {
throw IOException("DELTA_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
delta_offset + 1, byte_array_count);

if (!is_fixed_len) {
Store<uint32_t>(static_cast<uint32_t>(value_len), output);
output += sizeof(uint32_t);
}
result_ptr[row_idx + result_offset] = string_data[delta_offset++];

memcpy(output, prev_value.data(), prefix_len);
block.available(suffix_len);
memcpy(output + prefix_len, block.ptr, suffix_len);
block.inc(suffix_len);

memcpy(prev_value.data(), output, value_len);
prev_len = value_len;
output += value_len;
}
StringVector::AddHeapReference(result, *byte_array_data);
}

void DeltaByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
reader.Plain(plain_data, defines, read_count, result_offset, result);
}

void DeltaByteArrayDecoder::Skip(uint8_t *defines, idx_t skip_count) {
if (!byte_array_data) {
throw std::runtime_error("Internal error - DeltaByteArray called but there was no byte_array_data set");
}
for (idx_t row_idx = 0; row_idx < skip_count; row_idx++) {
if (defines && defines[row_idx] != reader.MaxDefine()) {
continue;
}
if (delta_offset >= byte_array_count) {
throw IOException("DELTA_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
delta_offset + 1, byte_array_count);
}
delta_offset++;
}
reader.PlainSkip(*plain_data, defines, skip_count);
}

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class DeltaByteArrayDecoder {

private:
ColumnReader &reader;
unique_ptr<Vector> byte_array_data;
idx_t byte_array_count = 0;
idx_t delta_offset = 0;

//! Decoded data in plain Parquet page format
shared_ptr<ResizeableBuffer> plain_data;
};

} // namespace duckdb
22 changes: 22 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
#include "duckdb/main/extension.hpp"
#include "duckdb/main/extension_helper.hpp"
#include "duckdb/main/extension_install_info.hpp"
#include "duckdb/main/profiling_info.hpp"
#include "duckdb/main/query_parameters.hpp"
#include "duckdb/main/query_profiler.hpp"
#include "duckdb/main/query_result.hpp"
Expand Down Expand Up @@ -3806,6 +3807,27 @@ ProfilingCoverage EnumUtil::FromString<ProfilingCoverage>(const char *value) {
return static_cast<ProfilingCoverage>(StringUtil::StringToEnum(GetProfilingCoverageValues(), 2, "ProfilingCoverage", value));
}

const StringUtil::EnumStringLiteral *GetProfilingParameterNamesValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(ProfilingParameterNames::FORMAT), "FORMAT" },
{ static_cast<uint32_t>(ProfilingParameterNames::COVERAGE), "COVERAGE" },
{ static_cast<uint32_t>(ProfilingParameterNames::SAVE_LOCATION), "SAVE_LOCATION" },
{ static_cast<uint32_t>(ProfilingParameterNames::MODE), "MODE" },
{ static_cast<uint32_t>(ProfilingParameterNames::METRICS), "METRICS" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<ProfilingParameterNames>(ProfilingParameterNames value) {
return StringUtil::EnumToString(GetProfilingParameterNamesValues(), 5, "ProfilingParameterNames", static_cast<uint32_t>(value));
}

template<>
ProfilingParameterNames EnumUtil::FromString<ProfilingParameterNames>(const char *value) {
return static_cast<ProfilingParameterNames>(StringUtil::StringToEnum(GetProfilingParameterNamesValues(), 5, "ProfilingParameterNames", value));
}

const StringUtil::EnumStringLiteral *GetPushdownExtractSupportValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(PushdownExtractSupport::UNCHECKED), "UNCHECKED" },
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ bool FileSystem::IsPathAbsolute(const string &path) {

string FileSystem::NormalizeAbsolutePath(const string &path) {
D_ASSERT(IsPathAbsolute(path));
auto result = StringUtil::Lower(FileSystem::ConvertSeparators(path));
auto result = FileSystem::ConvertSeparators(path);
if (StartsWithSingleBackslash(result)) {
// Path starts with a single backslash or forward slash
// prepend drive letter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,18 @@ void ExpressionExecutor::Execute(const BoundOperatorExpression &expr, Expression
}
} else if (expression_type == ExpressionType::OPERATOR_TRY) {
auto &child_state = *state->child_states[0];
Vector try_result(result.GetType());
try {
Execute(*expr.children[0], &child_state, sel, count, result);
Execute(*expr.children[0], &child_state, sel, count, try_result);
if (try_result.GetVectorType() == VectorType::CONSTANT_VECTOR) {
result.Reference(try_result);
return;
}
if (sel) {
VectorOperations::Copy(try_result, result, *sel, count, 0, 0, count);
} else {
VectorOperations::Copy(try_result, result, count, 0, 0);
}
return;
} catch (std::exception &ex) {
ErrorData error(ex);
Expand Down
10 changes: 7 additions & 3 deletions src/duckdb/src/execution/physical_plan/plan_set_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ static vector<unique_ptr<Expression>> CreatePartitionedRowNumExpression(const ve
return res;
}

static JoinCondition CreateNotDistinctComparison(const LogicalType &type, idx_t i) {
static JoinCondition CreateNotDistinctComparison(ClientContext &context, const LogicalType &type, idx_t i) {
JoinCondition cond;
cond.left = make_uniq<BoundReferenceExpression>(type, i);
cond.right = make_uniq<BoundReferenceExpression>(type, i);
cond.comparison = ExpressionType::COMPARE_NOT_DISTINCT_FROM;

ExpressionBinder::PushCollation(context, cond.left, type);
ExpressionBinder::PushCollation(context, cond.right, type);

return cond;
}

Expand Down Expand Up @@ -59,7 +63,7 @@ PhysicalOperator &PhysicalPlanGenerator::CreatePlan(LogicalSetOperation &op) {
vector<JoinCondition> conditions;
// create equality condition for all columns
for (idx_t i = 0; i < types.size(); i++) {
conditions.push_back(CreateNotDistinctComparison(types[i], i));
conditions.push_back(CreateNotDistinctComparison(context, types[i], i));
}
// For EXCEPT ALL / INTERSECT ALL we push a window operator with a ROW_NUMBER into the scans and join to get bag
// semantics.
Expand All @@ -80,7 +84,7 @@ PhysicalOperator &PhysicalPlanGenerator::CreatePlan(LogicalSetOperation &op) {
right = right_window;

// add window expression result to join condition
conditions.push_back(CreateNotDistinctComparison(LogicalType::BIGINT, types.size()));
conditions.push_back(CreateNotDistinctComparison(context, LogicalType::BIGINT, types.size()));
// join (created below) now includes the row number result column
op.types.push_back(LogicalType::BIGINT);
}
Expand Down
Loading