diff --git a/config/frac_version.go b/config/frac_version.go index d3ff1b14..07427f23 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,6 +9,10 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 + // BinaryDataV3 - bitpack for LIDs/MIDs + BinaryDataV3 + // BinaryDataV4 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting + BinaryDataV4 ) -const CurrentFracVersion = BinaryDataV2 +const CurrentFracVersion = BinaryDataV4 diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 3f0994e6..21050606 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -97,7 +97,7 @@ func (s *FractionTestSuite) SetupTestCommon() { DocsPositionsZstdLevel: 1, TokenTableZstdLevel: 1, DocBlocksZstdLevel: 1, - LIDBlockSize: 512, + LIDBlockSize: 256, DocBlockSize: 128 * int(units.KiB), } @@ -1326,6 +1326,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: toTime, }, + // block skipping scenarios + { + name: "service:gateway AND trace_id:trace-2026", + query: "service:gateway AND trace_id:trace-2026", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.traceId == "trace-2026" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + filter: func(doc *testDoc) bool { + return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5", + query: "service:gateway AND pod:pod-5", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5 AND message:failed", + query: "service:gateway AND pod:pod-5 AND message:failed", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "service:gateway AND message:processing AND message:retry AND level:5", query: "service:gateway AND message:processing AND message:retry AND level:5", @@ -1337,6 +1374,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { toTime: toTime, }, // OR operator queries + { + name: "(service OR) AND (trace_id OR)", + query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)", + filter: func(doc *testDoc) bool { + return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" || + doc.traceId == "trace-1500" || + doc.traceId == "trace-2000") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "trace_id OR", query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000", diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index a2fd0c5a..16715ac1 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) } // fast path: smallest remaining > nextID => skip entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if it.lids[0] > nextID.Unpack() { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index 5c4e08d9..6c0691e2 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count } // fast path: last LID < nextID => skip the entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if nextID.Unpack() > it.lids[len(it.lids)-1] { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/table.go b/frac/sealed/lids/table.go index 5a04acdd..cdb5548d 100644 --- a/frac/sealed/lids/table.go +++ b/frac/sealed/lids/table.go @@ -3,6 +3,7 @@ package lids import ( "sort" + "github.com/ozontech/seq-db/config" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" @@ -12,24 +13,35 @@ type Table struct { StartBlockIndex uint32 MaxTIDs []uint32 // defines last tid for each block MinTIDs []uint32 // defines first not continued tid for each block + FirstLIDs []uint32 + LastLIDs []uint32 - // TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID() - // TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1] - IsContinued []bool + FracVer config.BinaryDataVersion + IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive) } -func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table { +func NewTable( + fracVer config.BinaryDataVersion, + startOfLIDsBlockIndex uint32, + minTIDs, maxTIDs []uint32, + firstLIDs, lastLIDs []uint32, + isContinued []bool) *Table { return &Table{ StartBlockIndex: startOfLIDsBlockIndex, MinTIDs: minTIDs, MaxTIDs: maxTIDs, + FirstLIDs: firstLIDs, + LastLIDs: lastLIDs, IsContinued: isContinued, + FracVer: fracVer, } } func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 { - if t.IsContinued[blockIndex] { - return t.MinTIDs[blockIndex] - 1 + if t.FracVer < config.BinaryDataV4 { + if t.IsContinued[blockIndex] { + return t.MinTIDs[blockIndex] - 1 + } } return t.MinTIDs[blockIndex] } @@ -75,6 +87,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 { return uint32(index) } +// SeekBlockGeq finds next block for provided TID which contains +// lid greater or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV4 { + // not supported for old frac versions + return index + } + + res := index + for i := index + 1; i < uint32(len(t.MinTIDs)); i++ { + if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] { + res = i + continue + } + break + } + return res +} + +// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs +// less or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV4 { + // not supported for old frac versions + return index + } + + res := index + for i := int(index) - 1; i >= 0; i-- { + if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] { + res = uint32(i) + continue + } + break + } + return res +} + func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool { if blockIndex == 0 { // it is no prev block return false diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 14a5cac7..7eee7b1f 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -25,9 +25,10 @@ type tokensSealBlock struct { // lidsExt represents the range and continuation status of LID blocks. type lidsExt struct { - minTID uint32 // First token ID in the LID block - maxTID uint32 // Last token ID in the LID block - isContinued bool // Whether LID sequence continues in next block + minTID uint32 // First token ID in the LID block + maxTID uint32 // Last token ID in the LID block + firstLID uint32 // First LID in the LID block + lastLID uint32 // Last LID in the LID block } // lidsSealBlock represents a sealed block containing LID (Local ID) data. @@ -169,7 +170,6 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa currentTID uint32 // Current TID being processed currentBlock lidsSealBlock // Current block under construction isEndOfToken bool // Flag for end of current token's LIDs - isContinued bool // Flag for block continuation ) // Initialize first block @@ -186,8 +186,8 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) } currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field - currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field - isContinued = !isEndOfToken + currentBlock.ext.firstLID = currentBlock.payload.LIDs[0] + currentBlock.ext.lastLID = currentBlock.payload.LIDs[len(currentBlock.payload.LIDs)-1] return yield(currentBlock) } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go index 80892ca2..fd62ba32 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -319,9 +319,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { expected := []lidsSealBlock{{ ext: lidsExt{ - minTID: 1, - maxTID: 1, - isContinued: false, + minTID: 1, + maxTID: 1, + firstLID: 10, + lastLID: 30, }, payload: lids.Block{ LIDs: []uint32{10, 20, 30}, @@ -330,20 +331,21 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 1, - maxTID: 2, - isContinued: true, + minTID: 1, + maxTID: 2, + firstLID: 40, + lastLID: 21, }, payload: lids.Block{ - LIDs: []uint32{40, 11, 21}, - Offsets: []uint32{0, 1, 3}, - IsLastLID: false, + LIDs: []uint32{40, 11, 21}, + Offsets: []uint32{0, 1, 3}, }, }, { ext: lidsExt{ - minTID: 2, - maxTID: 3, - isContinued: true, + minTID: 2, + maxTID: 3, + firstLID: 31, + lastLID: 10, }, payload: lids.Block{ LIDs: []uint32{31, 41, 10}, @@ -352,9 +354,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 3, - maxTID: 3, - isContinued: true, + minTID: 3, + maxTID: 3, + firstLID: 11, + lastLID: 21, }, payload: lids.Block{ LIDs: []uint32{11, 20, 21}, @@ -363,9 +366,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: false, + minTID: 4, + maxTID: 4, + firstLID: 30, + lastLID: 50, }, payload: lids.Block{ LIDs: []uint32{30, 40, 50}, @@ -374,9 +378,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: true, + minTID: 4, + maxTID: 4, + firstLID: 60, + lastLID: 60, }, payload: lids.Block{ LIDs: []uint32{60}, diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 48cf8302..3b4a6b9e 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -8,6 +8,7 @@ import ( "time" "github.com/alecthomas/units" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" @@ -414,22 +415,18 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { - var ext1 uint64 - if block.ext.isContinued { // todo: Legacy continuation flag - ext1 = 1 - block.ext.minTID++ // Adjust for legacy format - } - // Update LIDs table for PreloadedData s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) - s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) + s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID) + s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID) + s.lidsTable.FracVer = config.CurrentFracVersion // Packing block s.buf1 = block.payload.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) - b.ext1 = ext1 // Legacy continuation flag - b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range + b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range + b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range return b } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index ae639862..419d334e 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexRea logger.Fatal("load ids error", zap.Error(err)) } - if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(); err != nil { + if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(info.BinaryDataVer); err != nil { logger.Fatal("load lids error", zap.Error(err)) } @@ -134,9 +134,11 @@ func (l *Loader) skipTokens() { } } -func (l *Loader) loadLIDsBlocksTable() (*lids.Table, error) { +func (l *Loader) loadLIDsBlocksTable(fracVer config.BinaryDataVersion) (*lids.Table, error) { maxTIDs := make([]uint32, 0) minTIDs := make([]uint32, 0) + firstLIDs := make([]uint32, 0) + lastLIDs := make([]uint32, 0) isContinued := make([]bool, 0) startIndex := l.blockIndex @@ -152,8 +154,13 @@ func (l *Loader) loadLIDsBlocksTable() (*lids.Table, error) { maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) - isContinued = append(isContinued, ext1 == 1) + if fracVer >= config.BinaryDataV4 { + lastLIDs = append(lastLIDs, uint32(ext1>>32)) + firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF)) + } else { + isContinued = append(isContinued, ext1 == 1) + } } - return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, startIndex, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil }