Skip to content

Commit ad6aeae

Browse files
authored
[25.2] Impl methods to detect BulkUpsert with DEFAULT columns and a flag to disable (#27596) (#28871)
2 parents a5a7124 + de5c8d5 commit ad6aeae

File tree

6 files changed

+39
-3
lines changed

6 files changed

+39
-3
lines changed

ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
782782
TKikimrRunner kikimr(TKikimrSettings()
783783
.SetUseRealThreads(false)
784784
.SetEnableAddColumsWithDefaults(true)
785+
.SetDisableMissingDefaultColumnsInBulkUpsert(true)
785786
.SetWithSampleTables(false));
786787

787788
auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );
788789
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
789790
auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
790791

792+
auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
793+
791794
auto& runtime = *kikimr.GetTestServer().GetRuntime();
792795

793796
{
@@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
860863

861864
auto alterQuery = R"(
862865
ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError`
863-
ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
866+
ADD COLUMN Value3 Int32 DEFAULT 7;
864867
)";
865868

866869
auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); });
@@ -924,8 +927,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
924927

925928
auto result = runtime.WaitFuture(alterFuture);
926929
fCompareTable(R"([
927-
[1u;"Changed";"Updated";7];
928-
[2u;"New";"text";7]
930+
[1u;"Changed";"Updated";[7]];
931+
[2u;"New";"text";[7]]
929932
])");
930933
}
931934

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,5 @@ message TFeatureFlags {
213213
optional bool EnableTopicCompactificationByKey = 196 [default = true];
214214
optional bool EnableCompactionOverloadDetection = 197 [default = true];
215215
reserved 198; // DisableColumnShardBulkUpsertRequireAllColumns
216+
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
216217
}

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class TTestFeatureFlagsHolder {
7878
FEATURE_FLAG_SETTER(EnablePermissionsExport)
7979
FEATURE_FLAG_SETTER(EnableLocalDBBtreeIndex)
8080
FEATURE_FLAG_SETTER(EnableSharedMetadataAccessorCache)
81+
FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)
8182

8283
#undef FEATURE_FLAG_SETTER
8384
};

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
407407
THashMap<TString, ui32> columnByName;
408408
THashSet<TString> keyColumnsLeft;
409409
THashSet<TString> notNullColumnsLeft = entry.NotNullColumns;
410+
THashSet<TString> defaultColumnsLeft;
410411
SrcColumns.reserve(entry.Columns.size());
411412
THashSet<TString> HasInternalConversion;
412413

@@ -426,6 +427,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
426427
keyColumnIds[keyOrder] = id;
427428
keyColumnsLeft.insert(name);
428429
}
430+
431+
if (colInfo.IsDefaultFromLiteral()) {
432+
defaultColumnsLeft.insert(name);
433+
}
429434
}
430435

431436
if (entry.ColumnTableInfo) {
@@ -524,6 +529,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
524529
NotNullColumns.emplace(ci.Name);
525530
}
526531

532+
if (defaultColumnsLeft.contains(ci.Name)) {
533+
defaultColumnsLeft.erase(ci.Name);
534+
}
535+
527536
if (ci.KeyOrder != -1) {
528537
KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};
529538
keyColumnsLeft.erase(ci.Name);
@@ -608,6 +617,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
608617
return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));
609618
}
610619

620+
if (!defaultColumnsLeft.empty() && UpsertIfExists) {
621+
// some default columns are not specified in the request, but upsert will only update existing rows
622+
// and only the columns specified in the request will be updated; unspecified default columns will not be changed.
623+
defaultColumnsLeft.clear();
624+
}
625+
626+
if (!defaultColumnsLeft.empty()) {
627+
if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) {
628+
return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str()));
629+
}
630+
631+
UploadCounters.OnMissingDefaultColumns();
632+
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str());
633+
}
634+
611635
TConclusionStatus res = TConclusionStatus::Success();
612636
if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {
613637
res = CheckRequiredColumns(entry, *reqColumns);

ydb/core/tx/tx_proxy/upload_rows_counters.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()
9999
WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");
100100
FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");
101101
RequestsBytes = TBase::GetDeriviative("Requests/Bytes");
102+
MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");
102103
}
103104
}

ydb/core/tx/tx_proxy/upload_rows_counters.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
7676
NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;
7777
NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes;
7878

79+
NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount;
80+
7981
THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;
8082

8183
NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
@@ -142,6 +144,10 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
142144
PackageSizeCountByRecords->Collect(rowsCount);
143145
RequestsBytes->Add(requestBytes);
144146
}
147+
148+
void OnMissingDefaultColumns() {
149+
MissingDefaultColumnsCount->Inc();
150+
}
145151
};
146152

147153
} // namespace NKikimr

0 commit comments

Comments
 (0)