From 0fa1d0ee692538bae96a44d8f16e699473b0db4a Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Thu, 2 Apr 2026 17:23:00 +0400 Subject: [PATCH 1/2] pushdown predicate for wildcards --- frac/active_token_list.go | 33 +++++++++++++++ frac/fraction_test.go | 57 ++++++++++++++++++++++++- frac/sealed/token/block_loader.go | 26 ++++++++++++ frac/sealed/token/provider.go | 70 +++++++++++++++++++++++++++++++ pattern/pattern.go | 62 ++++++++++++++------------- pattern/pattern_test.go | 35 +++++++++++++++- 6 files changed, 251 insertions(+), 32 deletions(-) diff --git a/frac/active_token_list.go b/frac/active_token_list.go index adf94ffd..072d858a 100644 --- a/frac/active_token_list.go +++ b/frac/active_token_list.go @@ -1,6 +1,7 @@ package frac import ( + "bytes" "context" "fmt" "hash/crc32" @@ -38,6 +39,38 @@ func (tp *activeTokenProvider) GetToken(tid uint32) []byte { return tp.tidToVal[id] } +// FindContains finds tids of tokens which contain a provided needle. From and to indices are specified inclusive. +func (tp *activeTokenProvider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { + if len(needle) == 0 { + return nil, nil + } + var tids []uint32 + for tid := firstTID; tid <= lastTID; tid++ { + if bytes.Contains(tp.GetToken(tid), needle) { + tids = append(tids, tid) + } + } + return tids, nil +} + +// FindToken finds tids of tokens which suffice a provided searcher (predicate). +func (tp *activeTokenProvider) FindToken(searcher pattern.Searcher) ([]uint32, error) { + firstTID := searcher.FirstTID() + lastTID := searcher.LastTID() + var tids []uint32 + for tid := firstTID; tid <= lastTID; tid++ { + match, err := searcher.Check(tp.GetToken(tid)) + if err != nil { + return nil, err + } + + if match { + tids = append(tids, tid) + } + } + return tids, nil +} + func (tp *activeTokenProvider) FirstTID() uint32 { return 1 } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8e44f566..adad00ae 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1390,7 +1390,7 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { toTime: midTime, }, - // other queries + // wildcards { name: "trace_id:trace-4*", query: "trace_id:trace-4*", @@ -1398,6 +1398,61 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: toTime, }, + { + name: "id:*1* OR id:*2*", + query: "id:*1* OR id:*2*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.id, "1") || strings.Contains(doc.id, "2") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "id:*1* AND id:*2*", + query: "id:*1* AND id:*2*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.id, "1") && strings.Contains(doc.id, "2") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "id:*1 OR id:*2 OR id:*3", + query: "id:*1 OR id:*2 OR id:*3", + filter: func(doc *testDoc) bool { + return strings.HasSuffix(doc.id, "1") || strings.HasSuffix(doc.id, "2") || strings.HasSuffix(doc.id, "3") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "message:*re*", + query: "message:*re*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.message, "re") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "message:*uest OR id:*1", + query: "message:*uest OR id:*1", + filter: func(doc *testDoc) bool { + // the only message token which suffices is 'request' + return strings.Contains(doc.message, "request") || strings.HasSuffix(doc.id, "1") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:*a*", + query: "service:*a*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.service, "a") + }, + fromTime: fromTime, + toTime: toTime, + }, } for _, tc := range searchTestCases { diff --git a/frac/sealed/token/block_loader.go b/frac/sealed/token/block_loader.go index 3a60b9ba..edb4f1f1 100644 --- a/frac/sealed/token/block_loader.go +++ b/frac/sealed/token/block_loader.go @@ -1,6 +1,7 @@ package token import ( + "bytes" "encoding/binary" "fmt" "math" @@ -10,6 +11,7 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/pattern" "github.com/ozontech/seq-db/storage" ) @@ -60,6 +62,30 @@ func (b *Block) GetToken(index int) []byte { return b.Payload[offset : offset+l] } +func (b *Block) FindContains(from, to int, needle []byte) ([]int, error) { + indices := make([]int, 0) + for i := from; i <= to; i++ { + if bytes.Contains(b.GetToken(i), needle) { + indices = append(indices, i) + } + } + return indices, nil +} + +func (b *Block) FindToken(from, to int, searcher pattern.Searcher) ([]int, error) { + indices := make([]int, 0) + for i := from; i <= to; i++ { + ok, err := searcher.Check(b.GetToken(i)) + if err != nil { + return nil, err + } + if ok { + indices = append(indices, i) + } + } + return indices, nil +} + // BlockLoader is responsible for Reading from disk, unpacking and caching tokens blocks. // NOT THREAD SAFE. Do not use concurrently. // Use your own BlockLoader instance for each search query diff --git a/frac/sealed/token/provider.go b/frac/sealed/token/provider.go index 6d18ff68..b6affa51 100644 --- a/frac/sealed/token/provider.go +++ b/frac/sealed/token/provider.go @@ -2,6 +2,8 @@ package token import ( "sort" + + "github.com/ozontech/seq-db/pattern" ) type Provider struct { @@ -55,3 +57,71 @@ func (tp *Provider) GetToken(tid uint32) []byte { block := tp.findBlock(entry.BlockIndex) return block.GetToken(entry.GetIndexInTokensBlock(tid)) } + +func (tp *Provider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { + return tp.findInBlocks(firstTID, lastTID, func(b *Block, firstIndex, lastIndex int) ([]int, error) { + return b.FindContains(firstIndex, lastIndex, needle) + }) +} + +func (tp *Provider) FindToken(searcher pattern.Searcher) ([]uint32, error) { + return tp.findInBlocks(searcher.FirstTID(), searcher.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) { + return b.FindToken(firstIndex, lastIndex, searcher) + }) +} + +func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, int, int) ([]int, error)) ([]uint32, error) { + entries := tp.narrowEntries(firstTID, lastTID) + if len(entries) == 0 { + return nil, nil + } + + var tids []uint32 + + for _, entry := range entries { + block := tp.findBlock(entry.BlockIndex) + firstIndex, lastIndex := tp.narrowTIDs(entry, firstTID, lastTID) + indices, err := search(block, firstIndex, lastIndex) + if err != nil { + return nil, err + } + for _, idx := range indices { + tid := entry.StartTID + uint32(idx-int(entry.StartIndex)) + tids = append(tids, tid) + } + } + return tids, nil +} + +func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID uint32, fromTID uint32) (int, int) { + tidStart := firstTID + if entry.StartTID > tidStart { + tidStart = entry.StartTID + } + tidEnd := fromTID + if lastTID := entry.getLastTID(); lastTID < tidEnd { + tidEnd = lastTID + } + + firstIndex := entry.GetIndexInTokensBlock(tidStart) + lastIndex := entry.GetIndexInTokensBlock(tidEnd) + return firstIndex, lastIndex +} + +func (tp *Provider) narrowEntries(firstTID uint32, lastTID uint32) []*TableEntry { + firstIdx := sort.Search(len(tp.entries), func(i int) bool { + return tp.entries[i].getLastTID() >= firstTID + }) + if firstIdx >= len(tp.entries) { + return nil + } + lastIdx := sort.Search(len(tp.entries), func(i int) bool { + return tp.entries[i].StartTID > lastTID + }) + lastIdx-- + if lastIdx < firstIdx { + return nil + } + entries := tp.entries[firstIdx : lastIdx+1] + return entries +} diff --git a/pattern/pattern.go b/pattern/pattern.go index ab7c879a..3dfff829 100644 --- a/pattern/pattern.go +++ b/pattern/pattern.go @@ -17,6 +17,8 @@ import ( type tokenProvider interface { GetToken(uint32) []byte + FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) + FindToken(searcher Searcher) ([]uint32, error) FirstTID() uint32 LastTID() uint32 Ordered() bool @@ -27,11 +29,11 @@ type baseSearch struct { last int } -func (s *baseSearch) firstTID() uint32 { +func (s *baseSearch) FirstTID() uint32 { return uint32(s.first) } -func (s *baseSearch) lastTID() uint32 { +func (s *baseSearch) LastTID() uint32 { return uint32(s.last) } @@ -67,7 +69,7 @@ func (s *literalSearch) Narrow(tp tokenProvider) { s.last = s.first - 1 // begin > end: will be considered empty } -func (s *literalSearch) check(val []byte) (bool, error) { +func (s *literalSearch) Check(val []byte) (bool, error) { if s.narrowed { return len(s.value) == len(val), nil } @@ -165,7 +167,7 @@ func findSequence(haystack []byte, needles [][]byte) int { return len(needles) } -func (s *wildcardSearch) check(val []byte) (bool, error) { +func (s *wildcardSearch) Check(val []byte) (bool, error) { return s.checkPrefix(val) && s.checkSuffix(val) && s.checkMiddle(val), nil } @@ -181,7 +183,7 @@ func newRangeTextSearch(base baseSearch, token *parser.Range) *rangeTextSearch { } } -func (s *rangeTextSearch) check(val []byte) (bool, error) { +func (s *rangeTextSearch) Check(val []byte) (bool, error) { valStr := string(val) if s.token.From.Kind != parser.TermSymbol { if s.token.IncludeFrom { @@ -244,7 +246,7 @@ func newRangeNumberSearch(base baseSearch, token *parser.Range) *rangeNumberSear return s } -func (s *rangeNumberSearch) check(rawVal []byte) (bool, error) { +func (s *rangeNumberSearch) Check(rawVal []byte) (bool, error) { val, err := strconv.ParseFloat(string(rawVal), 64) if err != nil || isNaNOrInf(val) { return false, nil @@ -301,7 +303,7 @@ func newRangeIPSearch(base baseSearch, token *parser.IPRange) *rangeIpSearch { return s } -func (s *rangeIpSearch) check(rawVal []byte) (bool, error) { +func (s *rangeIpSearch) Check(rawVal []byte) (bool, error) { val, err := netip.ParseAddr(string(rawVal)) if err != nil { return false, nil @@ -324,7 +326,7 @@ func newReSearch(base baseSearch, token *parser.Re) *reSearch { return &reSearch{baseSearch: base, r: token.CompiledExpression} } -func (s *reSearch) check(rawVal []byte) (bool, error) { +func (s *reSearch) Check(rawVal []byte) (bool, error) { if config.MaxRegexTokensCheck > 0 && s.checked >= config.MaxRegexTokensCheck { return false, errors.New( "'re' filter exceeded token limit: " + @@ -335,13 +337,13 @@ func (s *reSearch) check(rawVal []byte) (bool, error) { return s.r.Match(rawVal), nil } -type searcher interface { - firstTID() uint32 - lastTID() uint32 - check(val []byte) (bool, error) +type Searcher interface { + FirstTID() uint32 + LastTID() uint32 + Check(val []byte) (bool, error) } -func newSearcher(token parser.Token, tp tokenProvider) searcher { +func newSearcher(token parser.Token, tp tokenProvider) Searcher { base := baseSearch{ first: int(tp.FirstTID()), last: int(tp.LastTID()), @@ -390,22 +392,24 @@ func isNaNOrInf(f float64) bool { return math.IsNaN(f) || math.IsInf(f, 0) } -func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) { - tids := []uint32{} - s := newSearcher(t, tp) - for tid := s.firstTID(); tid <= s.lastTID(); tid++ { - if tid&1023 == 0 && util.IsCancelled(ctx) { - return nil, ctx.Err() - } - - match, err := s.check(tp.GetToken(tid)) - if err != nil { - return nil, err - } +func isSimpleWildcardContains(token parser.Token) (needle []byte, ok bool) { + lit, ok := token.(*parser.Literal) + if !ok || len(lit.Terms) != 3 { + return nil, false + } + if !lit.Terms[0].IsWildcard() || lit.Terms[1].Kind != parser.TermText || !lit.Terms[2].IsWildcard() { + return nil, false + } + return []byte(lit.Terms[1].Data), true +} - if match { - tids = append(tids, tid) - } +func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) { + if util.IsCancelled(ctx) { + return nil, ctx.Err() + } + if needle, ok := isSimpleWildcardContains(t); ok { + return tp.FindContains(tp.FirstTID(), tp.LastTID(), needle) } - return tids, nil + s := newSearcher(t, tp) + return tp.FindToken(s) } diff --git a/pattern/pattern_test.go b/pattern/pattern_test.go index a723a650..49a297bd 100644 --- a/pattern/pattern_test.go +++ b/pattern/pattern_test.go @@ -1,6 +1,7 @@ package pattern import ( + "bytes" "errors" "math" "math/rand" @@ -99,6 +100,36 @@ func (tp *simpleTokenProvider) Ordered() bool { return tp.ordered } +func (tp *simpleTokenProvider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { + if len(needle) == 0 { + return nil, nil + } + var tids []uint32 + for t := firstTID; t <= lastTID; t++ { + if bytes.Contains(tp.GetToken(t), needle) { + tids = append(tids, t) + } + } + return tids, nil +} + +func (tp *simpleTokenProvider) FindToken(searcher Searcher) ([]uint32, error) { + firstTID := searcher.FirstTID() + lastTID := searcher.LastTID() + var tids []uint32 + for t := firstTID; t <= lastTID; t++ { + match, err := searcher.Check(tp.GetToken(t)) + if err != nil { + return nil, err + } + + if match { + tids = append(tids, t) + } + } + return tids, nil +} + func searchAll(t *testing.T, tp testTokenProvider, req string, expect []string) { sort.Strings(expect) assert.False(t, tp.shuffled.Ordered(), "data is sorted") @@ -133,10 +164,10 @@ func search(t *testing.T, tp *simpleTokenProvider, req string, expect []string) s := newSearcher(token, tp) res := []string{} - for i := s.firstTID(); i <= s.lastTID(); i++ { + for i := s.FirstTID(); i <= s.LastTID(); i++ { val := tp.GetToken(i) - match, err := s.check(val) + match, err := s.Check(val) if err != nil { t.Fatal(err) } From cee0a606e97544334db0b42ee02f4d548d0613d9 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 3 Apr 2026 10:48:25 +0400 Subject: [PATCH 2/2] lint fixes --- frac/active_token_list.go | 2 +- frac/sealed/token/provider.go | 6 +++--- pattern/pattern_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/frac/active_token_list.go b/frac/active_token_list.go index 072d858a..472f6d1a 100644 --- a/frac/active_token_list.go +++ b/frac/active_token_list.go @@ -40,7 +40,7 @@ func (tp *activeTokenProvider) GetToken(tid uint32) []byte { } // FindContains finds tids of tokens which contain a provided needle. From and to indices are specified inclusive. -func (tp *activeTokenProvider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { +func (tp *activeTokenProvider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) { if len(needle) == 0 { return nil, nil } diff --git a/frac/sealed/token/provider.go b/frac/sealed/token/provider.go index b6affa51..b84fb8df 100644 --- a/frac/sealed/token/provider.go +++ b/frac/sealed/token/provider.go @@ -58,7 +58,7 @@ func (tp *Provider) GetToken(tid uint32) []byte { return block.GetToken(entry.GetIndexInTokensBlock(tid)) } -func (tp *Provider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { +func (tp *Provider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) { return tp.findInBlocks(firstTID, lastTID, func(b *Block, firstIndex, lastIndex int) ([]int, error) { return b.FindContains(firstIndex, lastIndex, needle) }) @@ -93,7 +93,7 @@ func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, i return tids, nil } -func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID uint32, fromTID uint32) (int, int) { +func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID, fromTID uint32) (int, int) { tidStart := firstTID if entry.StartTID > tidStart { tidStart = entry.StartTID @@ -108,7 +108,7 @@ func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID uint32, fromTID uint3 return firstIndex, lastIndex } -func (tp *Provider) narrowEntries(firstTID uint32, lastTID uint32) []*TableEntry { +func (tp *Provider) narrowEntries(firstTID, lastTID uint32) []*TableEntry { firstIdx := sort.Search(len(tp.entries), func(i int) bool { return tp.entries[i].getLastTID() >= firstTID }) diff --git a/pattern/pattern_test.go b/pattern/pattern_test.go index 49a297bd..2ef521ab 100644 --- a/pattern/pattern_test.go +++ b/pattern/pattern_test.go @@ -100,7 +100,7 @@ func (tp *simpleTokenProvider) Ordered() bool { return tp.ordered } -func (tp *simpleTokenProvider) FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) { +func (tp *simpleTokenProvider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) { if len(needle) == 0 { return nil, nil }