Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions frac/active_token_list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package frac

import (
"bytes"
"context"
"fmt"
"hash/crc32"
Expand Down Expand Up @@ -38,6 +39,38 @@ func (tp *activeTokenProvider) GetToken(tid uint32) []byte {
return tp.tidToVal[id]
}

// FindContains finds tids of tokens which contain a provided needle. From and to indices are specified inclusive.
func (tp *activeTokenProvider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) {
if len(needle) == 0 {
return nil, nil
}
var tids []uint32
for tid := firstTID; tid <= lastTID; tid++ {
if bytes.Contains(tp.GetToken(tid), needle) {
tids = append(tids, tid)
}
}
return tids, nil
}

// FindToken finds tids of tokens which suffice a provided searcher (predicate).
func (tp *activeTokenProvider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
firstTID := searcher.FirstTID()
lastTID := searcher.LastTID()
var tids []uint32
for tid := firstTID; tid <= lastTID; tid++ {
match, err := searcher.Check(tp.GetToken(tid))
if err != nil {
return nil, err
}

if match {
tids = append(tids, tid)
}
}
return tids, nil
}

func (tp *activeTokenProvider) FirstTID() uint32 {
return 1
}
Expand Down
57 changes: 56 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,14 +1390,69 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: midTime,
},

// other queries
// wildcards
{
name: "trace_id:trace-4*",
query: "trace_id:trace-4*",
filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") },
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* OR id:*2*",
query: "id:*1* OR id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") || strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* AND id:*2*",
query: "id:*1* AND id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") && strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1 OR id:*2 OR id:*3",
query: "id:*1 OR id:*2 OR id:*3",
filter: func(doc *testDoc) bool {
return strings.HasSuffix(doc.id, "1") || strings.HasSuffix(doc.id, "2") || strings.HasSuffix(doc.id, "3")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*re*",
query: "message:*re*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "re")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*uest OR id:*1",
query: "message:*uest OR id:*1",
filter: func(doc *testDoc) bool {
// the only message token which suffices is 'request'
return strings.Contains(doc.message, "request") || strings.HasSuffix(doc.id, "1")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:*a*",
query: "service:*a*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.service, "a")
},
fromTime: fromTime,
toTime: toTime,
},
}

for _, tc := range searchTestCases {
Expand Down
26 changes: 26 additions & 0 deletions frac/sealed/token/block_loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package token

import (
"bytes"
"encoding/binary"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/pattern"
"github.com/ozontech/seq-db/storage"
)

Expand Down Expand Up @@ -60,6 +62,30 @@ func (b *Block) GetToken(index int) []byte {
return b.Payload[offset : offset+l]
}

func (b *Block) FindContains(from, to int, needle []byte) ([]int, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed that you can perform bytes.Contains on the block payload before checking each token individually. Have you measured performance of such optimization?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed that you can perform bytes.Contains on the block payload before checking each token individually.

Yes, I tried calling bytes.Index on entire payload. It boosts even further comparing to this PR:
message:foobar
35 ms => 9 ms

However, this means that when bytes.Index returns and if we have some proper index returned, then we need to do a bin search on Offsets to find an index and then check for false positive. It also comes with neat property that we can avoid call Unpack (build offsets) lazily which boosts cold query performance (somewhat around extra 20%).

I put a task to the backlog, decided that it's too much for a single PR.

indices := make([]int, 0)
Copy link
Copy Markdown
Member

@dkharms dkharms Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could pass here slice of needles as well to handle queries like message:*foo*bar* with multiple needles. Or there is something that blocks such improvement?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think it's doable. Maybe will do

for i := from; i <= to; i++ {
if bytes.Contains(b.GetToken(i), needle) {
indices = append(indices, i)
}
}
return indices, nil
}

func (b *Block) FindToken(from, to int, searcher pattern.Searcher) ([]int, error) {
indices := make([]int, 0)
for i := from; i <= to; i++ {
ok, err := searcher.Check(b.GetToken(i))
if err != nil {
return nil, err
}
if ok {
indices = append(indices, i)
}
}
return indices, nil
}

// BlockLoader is responsible for Reading from disk, unpacking and caching tokens blocks.
// NOT THREAD SAFE. Do not use concurrently.
// Use your own BlockLoader instance for each search query
Expand Down
70 changes: 70 additions & 0 deletions frac/sealed/token/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package token

import (
"sort"

"github.com/ozontech/seq-db/pattern"
)

type Provider struct {
Expand Down Expand Up @@ -55,3 +57,71 @@ func (tp *Provider) GetToken(tid uint32) []byte {
block := tp.findBlock(entry.BlockIndex)
return block.GetToken(entry.GetIndexInTokensBlock(tid))
}

func (tp *Provider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) {
return tp.findInBlocks(firstTID, lastTID, func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.FindContains(firstIndex, lastIndex, needle)
})
}

func (tp *Provider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
return tp.findInBlocks(searcher.FirstTID(), searcher.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.FindToken(firstIndex, lastIndex, searcher)
})
}

func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, int, int) ([]int, error)) ([]uint32, error) {
entries := tp.narrowEntries(firstTID, lastTID)
if len(entries) == 0 {
return nil, nil
}

var tids []uint32

for _, entry := range entries {
block := tp.findBlock(entry.BlockIndex)
firstIndex, lastIndex := tp.narrowTIDs(entry, firstTID, lastTID)
indices, err := search(block, firstIndex, lastIndex)
if err != nil {
return nil, err
}
for _, idx := range indices {
tid := entry.StartTID + uint32(idx-int(entry.StartIndex))
tids = append(tids, tid)
}
}
return tids, nil
}

func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID, fromTID uint32) (int, int) {
tidStart := firstTID
if entry.StartTID > tidStart {
tidStart = entry.StartTID
}
tidEnd := fromTID
if lastTID := entry.getLastTID(); lastTID < tidEnd {
tidEnd = lastTID
}

firstIndex := entry.GetIndexInTokensBlock(tidStart)
lastIndex := entry.GetIndexInTokensBlock(tidEnd)
return firstIndex, lastIndex
}

func (tp *Provider) narrowEntries(firstTID, lastTID uint32) []*TableEntry {
firstIdx := sort.Search(len(tp.entries), func(i int) bool {
return tp.entries[i].getLastTID() >= firstTID
})
if firstIdx >= len(tp.entries) {
return nil
}
lastIdx := sort.Search(len(tp.entries), func(i int) bool {
return tp.entries[i].StartTID > lastTID
})
lastIdx--
if lastIdx < firstIdx {
return nil
}
entries := tp.entries[firstIdx : lastIdx+1]
return entries
}
62 changes: 33 additions & 29 deletions pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

type tokenProvider interface {
GetToken(uint32) []byte
FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error)
FindToken(searcher Searcher) ([]uint32, error)
FirstTID() uint32
LastTID() uint32
Ordered() bool
Expand All @@ -27,11 +29,11 @@ type baseSearch struct {
last int
}

func (s *baseSearch) firstTID() uint32 {
func (s *baseSearch) FirstTID() uint32 {
return uint32(s.first)
}

func (s *baseSearch) lastTID() uint32 {
func (s *baseSearch) LastTID() uint32 {
return uint32(s.last)
}

Expand Down Expand Up @@ -67,7 +69,7 @@ func (s *literalSearch) Narrow(tp tokenProvider) {
s.last = s.first - 1 // begin > end: will be considered empty
}

func (s *literalSearch) check(val []byte) (bool, error) {
func (s *literalSearch) Check(val []byte) (bool, error) {
if s.narrowed {
return len(s.value) == len(val), nil
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func findSequence(haystack []byte, needles [][]byte) int {
return len(needles)
}

func (s *wildcardSearch) check(val []byte) (bool, error) {
func (s *wildcardSearch) Check(val []byte) (bool, error) {
return s.checkPrefix(val) && s.checkSuffix(val) && s.checkMiddle(val), nil
}

Expand All @@ -181,7 +183,7 @@ func newRangeTextSearch(base baseSearch, token *parser.Range) *rangeTextSearch {
}
}

func (s *rangeTextSearch) check(val []byte) (bool, error) {
func (s *rangeTextSearch) Check(val []byte) (bool, error) {
valStr := string(val)
if s.token.From.Kind != parser.TermSymbol {
if s.token.IncludeFrom {
Expand Down Expand Up @@ -244,7 +246,7 @@ func newRangeNumberSearch(base baseSearch, token *parser.Range) *rangeNumberSear
return s
}

func (s *rangeNumberSearch) check(rawVal []byte) (bool, error) {
func (s *rangeNumberSearch) Check(rawVal []byte) (bool, error) {
val, err := strconv.ParseFloat(string(rawVal), 64)
if err != nil || isNaNOrInf(val) {
return false, nil
Expand Down Expand Up @@ -301,7 +303,7 @@ func newRangeIPSearch(base baseSearch, token *parser.IPRange) *rangeIpSearch {
return s
}

func (s *rangeIpSearch) check(rawVal []byte) (bool, error) {
func (s *rangeIpSearch) Check(rawVal []byte) (bool, error) {
val, err := netip.ParseAddr(string(rawVal))
if err != nil {
return false, nil
Expand All @@ -324,7 +326,7 @@ func newReSearch(base baseSearch, token *parser.Re) *reSearch {
return &reSearch{baseSearch: base, r: token.CompiledExpression}
}

func (s *reSearch) check(rawVal []byte) (bool, error) {
func (s *reSearch) Check(rawVal []byte) (bool, error) {
if config.MaxRegexTokensCheck > 0 && s.checked >= config.MaxRegexTokensCheck {
return false, errors.New(
"'re' filter exceeded token limit: " +
Expand All @@ -335,13 +337,13 @@ func (s *reSearch) check(rawVal []byte) (bool, error) {
return s.r.Match(rawVal), nil
}

type searcher interface {
firstTID() uint32
lastTID() uint32
check(val []byte) (bool, error)
type Searcher interface {
FirstTID() uint32
LastTID() uint32
Check(val []byte) (bool, error)
}

func newSearcher(token parser.Token, tp tokenProvider) searcher {
func newSearcher(token parser.Token, tp tokenProvider) Searcher {
base := baseSearch{
first: int(tp.FirstTID()),
last: int(tp.LastTID()),
Expand Down Expand Up @@ -390,22 +392,24 @@ func isNaNOrInf(f float64) bool {
return math.IsNaN(f) || math.IsInf(f, 0)
}

func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
tids := []uint32{}
s := newSearcher(t, tp)
for tid := s.firstTID(); tid <= s.lastTID(); tid++ {
if tid&1023 == 0 && util.IsCancelled(ctx) {
return nil, ctx.Err()
}

match, err := s.check(tp.GetToken(tid))
if err != nil {
return nil, err
}
func isSimpleWildcardContains(token parser.Token) (needle []byte, ok bool) {
lit, ok := token.(*parser.Literal)
if !ok || len(lit.Terms) != 3 {
return nil, false
}
if !lit.Terms[0].IsWildcard() || lit.Terms[1].Kind != parser.TermText || !lit.Terms[2].IsWildcard() {
return nil, false
}
return []byte(lit.Terms[1].Data), true
}

if match {
tids = append(tids, tid)
}
func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
if util.IsCancelled(ctx) {
return nil, ctx.Err()
}
if needle, ok := isSimpleWildcardContains(t); ok {
return tp.FindContains(tp.FirstTID(), tp.LastTID(), needle)
}
return tids, nil
s := newSearcher(t, tp)
return tp.FindToken(s)
}
Loading
Loading