Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ const (
BinaryDataV1
// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2
// BinaryDataV3 - bitpack for LIDs/MIDs
BinaryDataV3
// BinaryDataV4 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting
BinaryDataV4
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV4
50 changes: 49 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
DocsPositionsZstdLevel: 1,
TokenTableZstdLevel: 1,
DocBlocksZstdLevel: 1,
LIDBlockSize: 512,
LIDBlockSize: 256,
DocBlockSize: 128 * int(units.KiB),
}

Expand Down Expand Up @@ -1326,6 +1326,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: toTime,
},
// block skipping scenarios
{
name: "service:gateway AND trace_id:trace-2026",
query: "service:gateway AND trace_id:trace-2026",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.traceId == "trace-2026"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
filter: func(doc *testDoc) bool {
return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5",
query: "service:gateway AND pod:pod-5",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5 AND message:failed",
query: "service:gateway AND pod:pod-5 AND message:failed",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND message:processing AND message:retry AND level:5",
query: "service:gateway AND message:processing AND message:retry AND level:5",
Expand All @@ -1337,6 +1374,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: toTime,
},
// OR operator queries
{
name: "(service OR) AND (trace_id OR)",
query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)",
filter: func(doc *testDoc) bool {
return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" ||
doc.traceId == "trace-1500" ||
doc.traceId == "trace-2000")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "trace_id OR",
query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000",
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock()
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids))
}

// fast path: smallest remaining > nextID => skip entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if it.lids[0] > nextID.Unpack() {
it.lids = it.lids[:0]
continue
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count
}

// fast path: last LID < nextID => skip the entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if nextID.Unpack() > it.lids[len(it.lids)-1] {
it.lids = it.lids[:0]
continue
Expand Down
64 changes: 58 additions & 6 deletions frac/sealed/lids/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lids
import (
"sort"

"github.com/ozontech/seq-db/config"
"go.uber.org/zap"

"github.com/ozontech/seq-db/logger"
Expand All @@ -12,24 +13,35 @@ type Table struct {
StartBlockIndex uint32
MaxTIDs []uint32 // defines last tid for each block
MinTIDs []uint32 // defines first not continued tid for each block
FirstLIDs []uint32
LastLIDs []uint32

// TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID()
// TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1]
IsContinued []bool
FracVer config.BinaryDataVersion
IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive)
}

func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table {
func NewTable(
fracVer config.BinaryDataVersion,
startOfLIDsBlockIndex uint32,
minTIDs, maxTIDs []uint32,
firstLIDs, lastLIDs []uint32,
isContinued []bool) *Table {
return &Table{
StartBlockIndex: startOfLIDsBlockIndex,
MinTIDs: minTIDs,
MaxTIDs: maxTIDs,
FirstLIDs: firstLIDs,
LastLIDs: lastLIDs,
IsContinued: isContinued,
FracVer: fracVer,
}
}

func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
if t.FracVer < config.BinaryDataV4 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
}
}
return t.MinTIDs[blockIndex]
}
Expand Down Expand Up @@ -75,6 +87,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 {
return uint32(index)
}

// SeekBlockGeq finds next block for provided TID which contains
// lid greater or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV4 {
// not supported for old frac versions
return index
}

res := index
for i := index + 1; i < uint32(len(t.MinTIDs)); i++ {
if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
res = i
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here i is int, but in SeekBlockLeq() it is uint32. let's make it consistent, for example:

for i := int(index) + 1; i <len(t.MinTIDs); i++ {
	if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
		res = uint32(i)
		continue
	}
	break
}

continue
}
break
}
return res
}

// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs
// less or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV4 {
// not supported for old frac versions
return index
}

res := index
for i := int(index) - 1; i >= 0; i-- {
if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] {
res = uint32(i)
continue
}
break
}
return res
}

func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool {
if blockIndex == 0 { // it is no prev block
return false
Expand Down
12 changes: 6 additions & 6 deletions frac/sealed/sealing/blocks_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type tokensSealBlock struct {

// lidsExt represents the range and continuation status of LID blocks.
type lidsExt struct {
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
isContinued bool // Whether LID sequence continues in next block
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
firstLID uint32 // First LID in the LID block
lastLID uint32 // Last LID in the LID block
}

// lidsSealBlock represents a sealed block containing LID (Local ID) data.
Expand Down Expand Up @@ -169,7 +170,6 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa
currentTID uint32 // Current TID being processed
currentBlock lidsSealBlock // Current block under construction
isEndOfToken bool // Flag for end of current token's LIDs
isContinued bool // Flag for block continuation
)

// Initialize first block
Expand All @@ -186,8 +186,8 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa
currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs)))
}
currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field
currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field
isContinued = !isEndOfToken
currentBlock.ext.firstLID = currentBlock.payload.LIDs[0]
currentBlock.ext.lastLID = currentBlock.payload.LIDs[len(currentBlock.payload.LIDs)-1]
return yield(currentBlock)
}

Expand Down
47 changes: 26 additions & 21 deletions frac/sealed/sealing/blocks_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {

expected := []lidsSealBlock{{
ext: lidsExt{
minTID: 1,
maxTID: 1,
isContinued: false,
minTID: 1,
maxTID: 1,
firstLID: 10,
lastLID: 30,
},
payload: lids.Block{
LIDs: []uint32{10, 20, 30},
Expand All @@ -330,20 +331,21 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 1,
maxTID: 2,
isContinued: true,
minTID: 1,
maxTID: 2,
firstLID: 40,
lastLID: 21,
},
payload: lids.Block{
LIDs: []uint32{40, 11, 21},
Offsets: []uint32{0, 1, 3},
IsLastLID: false,
LIDs: []uint32{40, 11, 21},
Offsets: []uint32{0, 1, 3},
},
}, {
ext: lidsExt{
minTID: 2,
maxTID: 3,
isContinued: true,
minTID: 2,
maxTID: 3,
firstLID: 31,
lastLID: 10,
},
payload: lids.Block{
LIDs: []uint32{31, 41, 10},
Expand All @@ -352,9 +354,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 3,
maxTID: 3,
isContinued: true,
minTID: 3,
maxTID: 3,
firstLID: 11,
lastLID: 21,
},
payload: lids.Block{
LIDs: []uint32{11, 20, 21},
Expand All @@ -363,9 +366,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 4,
maxTID: 4,
isContinued: false,
minTID: 4,
maxTID: 4,
firstLID: 30,
lastLID: 50,
},
payload: lids.Block{
LIDs: []uint32{30, 40, 50},
Expand All @@ -374,9 +378,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 4,
maxTID: 4,
isContinued: true,
minTID: 4,
maxTID: 4,
firstLID: 60,
lastLID: 60,
},
payload: lids.Block{
LIDs: []uint32{60},
Expand Down
15 changes: 6 additions & 9 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/alecthomas/units"
"github.com/ozontech/seq-db/config"

"github.com/ozontech/seq-db/bytespool"
"github.com/ozontech/seq-db/consts"
Expand Down Expand Up @@ -414,22 +415,18 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock {
// packLIDsBlock packs Local IDs (LIDs) into a compressed index block.
// Also updates LIDs table for preloaded data access.
func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock {
var ext1 uint64
if block.ext.isContinued { // todo: Legacy continuation flag
ext1 = 1
block.ext.minTID++ // Adjust for legacy format
}

// Update LIDs table for PreloadedData
s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID)
s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID)
s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued)
s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID)
s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID)
s.lidsTable.FracVer = config.CurrentFracVersion

// Packing block
s.buf1 = block.payload.Pack(s.buf1[:0])
b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel)
b.ext1 = ext1 // Legacy continuation flag
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range
b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range
return b
}

Expand Down
Loading