Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ message FetchRequest {
bool explain = 3;
repeated IdWithHint ids_with_hints = 4;
FieldsFilter fields_filter = 5;
// if true, skip masks will be evaluated
// set to false in fetch after search since ids are already skipped
bool evalSkipMasks = 6;
}

message StatusRequest {}
Expand Down
4 changes: 2 additions & 2 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,15 @@ func (f *Active) String() string {
return fracToString(f, "active")
}

func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Active) Fetch(ctx context.Context, ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
if f.Info().DocsTotal == 0 { // it is empty active fraction state
return nil, nil
}

dp := f.createDataProvider(ctx)
defer dp.release()

return dp.Fetch(ids)
return dp.Fetch(ids, evalSkipMasks)
}

func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
10 changes: 7 additions & 3 deletions frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (dp *activeDataProvider) getTokenIndex() *activeTokenIndex {
}
}

func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
func (dp *activeDataProvider) Fetch(ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

defer sw.Export(
Expand All @@ -88,7 +88,7 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
}}

for _, fi := range indexes {
if err := processor.IndexFetch(ids, sw, &fi, res); err != nil {
if err := processor.IndexFetch(ids, evalSkipMasks, sw, &fi, res); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -271,12 +271,16 @@ func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 {
return di.blocksOffsets[num]
}

func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
func (di *activeFetchIndex) GetDocPos(ids []seq.ID, evalSkipMasks bool) ([]seq.DocPos, error) {
docsPos := make([]seq.DocPos, len(ids))
for i, id := range ids {
docsPos[i] = di.docsPositions.GetSync(id)
}

if !evalSkipMasks {
return docsPos, nil
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion frac/fraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Fraction interface {
Info() *common.Info
IsIntersecting(from seq.MID, to seq.MID) bool
Contains(mid seq.MID) bool
Fetch(context.Context, []seq.ID) ([][]byte, error)
Fetch(context.Context, []seq.ID, bool) ([][]byte, error)
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
FindLIDs(context.Context, []seq.ID) ([]seq.LID, error)
}
Expand Down
2 changes: 1 addition & 1 deletion frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
return fmt.Errorf("search failed: %w", err)
}

fetchedResult, err := fraction.Fetch(ctx, qpr.IDs.IDs())
fetchedResult, err := fraction.Fetch(ctx, qpr.IDs.IDs(), false)
if err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
qprIDs := qpr.IDs.IDs()
totalIDsScrolled += len(qprIDs)

docs, err := s.fraction.Fetch(context.Background(), qprIDs)
docs, err := s.fraction.Fetch(context.Background(), qprIDs, false)
s.Require().NoError(err, "fetch failed for order=%v", order)

for j, doc := range docs {
Expand Down Expand Up @@ -1965,7 +1965,7 @@ func (s *FractionTestSuite) AssertSearchWithSearchParams(
s.Require().NoError(err, "search failed for query with order=%v", order)
s.Require().Equal(len(expectedIndexes), qpr.IDs.Len(), "doc count doesn't match")

docs, err := s.fraction.Fetch(context.Background(), qpr.IDs.IDs())
docs, err := s.fraction.Fetch(context.Background(), qpr.IDs.IDs(), false)
s.Require().NoError(err, "failed to fetch docs")

if order.IsReverse() {
Expand Down
6 changes: 3 additions & 3 deletions frac/processor/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

type fetchIndex interface {
GetBlocksOffsets(uint32) uint64
GetDocPos([]seq.ID) ([]seq.DocPos, error)
GetDocPos([]seq.ID, bool) ([]seq.DocPos, error)
ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error)
}

func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
func IndexFetch(ids []seq.ID, evalSkipMasks bool, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
m := sw.Start("get_docs_pos")
docsPos, err := fetchIndex.GetDocPos(ids)
docsPos, err := fetchIndex.GetDocPos(ids, evalSkipMasks)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ func (f *Remote) Contains(mid seq.MID) bool {
return f.info.IsIntersecting(mid, mid)
}

func (f *Remote) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Remote) Fetch(ctx context.Context, ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
dp, err := f.createDataProvider(ctx)
if err != nil {
return nil, err
}
defer dp.release()

return dp.Fetch(ids)
return dp.Fetch(ids, evalSkipMasks)
}

func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
4 changes: 2 additions & 2 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ func (f *Sealed) String() string {
return fracToString(f, "sealed")
}

func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
dp := f.createDataProvider(ctx)
defer dp.release()

return dp.Fetch(ids)
return dp.Fetch(ids, evalSkipMasks)
}

func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
10 changes: 7 additions & 3 deletions frac/sealed_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (dp *sealedDataProvider) release() {
dp.idsProvider.Release()
}

func (dp *sealedDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
func (dp *sealedDataProvider) Fetch(ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

defer sw.Export(
Expand All @@ -100,7 +100,7 @@ func (dp *sealedDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
)

res := make([][]byte, len(ids))
if err := processor.IndexFetch(ids, sw, dp.getFetchIndex(), res); err != nil {
if err := processor.IndexFetch(ids, evalSkipMasks, sw, dp.getFetchIndex(), res); err != nil {
return nil, err
}

Expand Down Expand Up @@ -280,9 +280,13 @@ func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 {
return fi.blocksOffsets[num]
}

func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID, evalSkipMasks bool) ([]seq.DocPos, error) {
allLids := fi.findLIDs(ids)

if !evalSkipMasks {
return fi.getDocPosByLIDs(allLids), nil
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
if len(allLids) > 0 {
// allLids can be not sorted
Expand Down
12 changes: 6 additions & 6 deletions fracmanager/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewFetcher(maxWorkersNum int) *Fetcher {
}
}

func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource) ([][]byte, error) {
func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource, evalSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

m := sw.Start("fill_revers_pos")
Expand All @@ -44,7 +44,7 @@ func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource)
m.Stop()

m = sw.Start("fetch_async")
docsByFracs, err := f.fetchDocsAsync(ctx, fracs, idsByFrac)
docsByFracs, err := f.fetchDocsAsync(ctx, fracs, idsByFrac, evalSkipMasks)
m.Stop()

// arrange the result in the original order of ids
Expand All @@ -64,7 +64,7 @@ func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource)
return result, err
}

func (f *Fetcher) fetchDocsAsync(ctx context.Context, fracs []frac.Fraction, idsByFrac [][]seq.ID) ([][][]byte, error) {
func (f *Fetcher) fetchDocsAsync(ctx context.Context, fracs []frac.Fraction, idsByFrac [][]seq.ID, evalSkipMasks bool) ([][][]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -84,7 +84,7 @@ loop:
wg.Add(1)
go func() {
var fracErr error
if docs[i], fracErr = fracFetch(ctx, frac, idsByFrac[i]); fracErr != nil {
if docs[i], fracErr = fracFetch(ctx, frac, idsByFrac[i], evalSkipMasks); fracErr != nil {
once.Do(func() {
err = fracErr
cancel()
Expand All @@ -105,13 +105,13 @@ loop:
return docs, nil
}

func fracFetch(ctx context.Context, f frac.Fraction, ids []seq.ID) (_ [][]byte, err error) {
func fracFetch(ctx context.Context, f frac.Fraction, ids []seq.ID, evalSkipMasks bool) (_ [][]byte, err error) {
defer func() {
if panicData := util.RecoverToError(recover(), metric.StorePanics); panicData != nil {
err = fmt.Errorf("internal error: fetch panicked on fraction %s, error=%w", f.Info().Name(), panicData)
}
}()
return f.Fetch(ctx, ids)
return f.Fetch(ctx, ids, evalSkipMasks)
}

func sortIDs(idsOrig seq.IDSources) (seq.IDSources, seq.MID, seq.MID) {
Expand Down
14 changes: 7 additions & 7 deletions fracmanager/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestFetcher_ShouldFetchMultiFrac(t *testing.T) {
{ID: seq.SimpleID(30)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("doc1"), []byte("doc2"), []byte("doc4"), []byte("doc3")}, docs)
Expand All @@ -51,7 +51,7 @@ func TestFetcher_DocNotFound(t *testing.T) {
{ID: seq.SimpleID(20)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs, false)

assert.NoError(t, err)
assert.Len(t, docs, 2)
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestFetcher_ShouldUseHints(t *testing.T) {
{ID: seq.SimpleID(10), Hint: frac1.Info().Name()},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), []byte("pineapple"), []byte("orange")}, docs)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestFetcher_ShouldUseHints_MixedScenario(t *testing.T) {
{ID: seq.SimpleID(50)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), []byte("pineapple"), []byte("orange"), []byte("mango")}, docs)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestFetcher_OutOfRangeFractions(t *testing.T) {
{ID: seq.SimpleID(20)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), nil, []byte("banana"), nil}, docs)
Expand All @@ -170,7 +170,7 @@ func TestFetcher_FetchError(t *testing.T) {

fetchIDs := []seq.IDSource{{ID: seq.SimpleID(20)}}

_, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2}, fetchIDs)
_, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2}, fetchIDs, false)

assert.ErrorContains(t, err, "fetch failed")
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestFetcher_ContextCancellation(t *testing.T) {
{ID: seq.SimpleID(20)},
}

_, err := fetcher.FetchDocs(ctx, List{frac1, frac2, frac3, frac4}, ids)
_, err := fetcher.FetchDocs(ctx, List{frac1, frac2, frac3, frac4}, ids, false)

assert.Error(t, err)
assert.ErrorIs(t, context.Canceled, err)
Expand Down
6 changes: 3 additions & 3 deletions fracmanager/proxy_frac.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (p *fractionProxy) Contains(mid seq.MID) bool {
return p.impl.Contains(mid)
}

func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return p.impl.Fetch(ctx, ids)
return p.impl.Fetch(ctx, ids, evalSkipMasks)
}

func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down Expand Up @@ -187,7 +187,7 @@ func (emptyFraction) Contains(mid seq.MID) bool {
return false
}

func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID, evalSkipMasks bool) ([][]byte, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion fracmanager/searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (f *testFakeFrac) Contains(mid seq.MID) bool {
return f.info.IsIntersecting(mid, mid)
}

func (f *testFakeFrac) Fetch(_ context.Context, ids []seq.ID) ([][]byte, error) {
func (f *testFakeFrac) Fetch(_ context.Context, ids []seq.ID, _ bool) ([][]byte, error) {
f.fetchCount++
if f.fetchError != nil {
return nil, f.fetchError
Expand Down
Loading
Loading