diff --git a/indexer/compress.go b/indexer/compress.go index ee706e4c..88a4aea4 100644 --- a/indexer/compress.go +++ b/indexer/compress.go @@ -36,13 +36,21 @@ func PutDocMetasCompressor(c *DocsMetasCompressor) { // CompressDocsAndMetas prepare docs and meta blocks for bulk insert. func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte) { - c.docsBuf = initBuf(c.docsBuf, len(docs)) - c.metaBuf = initBuf(c.metaBuf, len(meta)) + wg := sync.WaitGroup{} + wg.Add(2) + go func() { // compress docs block + c.docsBuf = initBuf(c.docsBuf, len(docs)) + c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) + wg.Done() + }() - // Compress docs block. - c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) - // Compress metas block. - c.metaBuf = storage.CompressWalBlock(meta, c.metaBuf, c.metaCompressLevel) + go func() { // compress metas block + c.metaBuf = initBuf(c.metaBuf, len(meta)) + c.metaBuf = storage.CompressWalBlock(meta, c.metaBuf, c.metaCompressLevel) + wg.Done() + }() + + wg.Wait() bulkSizeAfterCompression.Observe(float64(len(c.docsBuf) + len(c.metaBuf))) } diff --git a/proxy/bulk/ingestor.go b/proxy/bulk/ingestor.go index 3f7439e9..46ef009e 100644 --- a/proxy/bulk/ingestor.go +++ b/proxy/bulk/ingestor.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" + "github.com/alecthomas/units" "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/indexer" @@ -168,14 +169,10 @@ func (i *Ingestor) ProcessDocuments(ctx context.Context, requestTime time.Time, var ( binaryDocsPool = sync.Pool{ - New: func() any { - return new(bytespool.Buffer) - }, + New: func() any { return &bytespool.Buffer{B: make([]byte, 0, units.MiB)} }, } binaryMetasPool = sync.Pool{ - New: func() any { - return new(bytespool.Buffer) - }, + New: func() any { return &bytespool.Buffer{B: make([]byte, 0, units.MiB)} }, } ) diff --git a/seq/qpr.go b/seq/qpr.go index e35abf4a..cc5251c4 100644 --- a/seq/qpr.go +++ b/seq/qpr.go @@ -569,7 +569,9 @@ func MergeQPRs(dst *QPR, qprs []*QPR, limit int, histInterval MID, order DocsOrd } ids, repetitionsCount := removeRepetitionsAdvanced(dst.IDs, dst.Histogram, histInterval) - metric.RepetitionsDocsTotal.Add(float64(repetitionsCount)) + if repetitionsCount > 0 { + metric.RepetitionsDocsTotal.Add(float64(repetitionsCount)) + } // count only for queries with total if dst.Total > 0 { diff --git a/tokenizer/keyword_tokenizer.go b/tokenizer/keyword_tokenizer.go index 1c893bdf..a4a9e391 100644 --- a/tokenizer/keyword_tokenizer.go +++ b/tokenizer/keyword_tokenizer.go @@ -23,16 +23,16 @@ func (t *KeywordTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxT maxTokenSize = t.defaultMaxTokenSize } - if len(value) > maxTokenSize && !t.partialIndexing { - metric.SkippedIndexesKeyword.Inc() - metric.SkippedIndexesBytesKeyword.Add(float64(len(value))) - return tokens + if len(value) > maxTokenSize { + if !t.partialIndexing { + metric.SkippedIndexesKeyword.Inc() + metric.SkippedIndexesBytesKeyword.Add(float64(len(value))) + return tokens + } + metric.SkippedIndexesBytesKeyword.Add(float64(len(value) - maxTokenSize)) + value = value[:maxTokenSize] } - maxLength := min(len(value), maxTokenSize) - metric.SkippedIndexesBytesKeyword.Add(float64(len(value[maxLength:]))) - value = value[:maxLength] - tokens = append(tokens, MetaToken{ Key: name, Value: toLowerIfCaseInsensitive(t.caseSensitive, value), diff --git a/tokenizer/path_tokenizer.go b/tokenizer/path_tokenizer.go index 965cb28f..9714dabe 100644 --- a/tokenizer/path_tokenizer.go +++ b/tokenizer/path_tokenizer.go @@ -33,16 +33,16 @@ func (t *PathTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxToke maxTokenSize = t.defaultMaxTokenSize } - if len(value) > maxTokenSize && !t.partialIndexing { - metric.SkippedIndexesPath.Inc() - metric.SkippedIndexesBytesPath.Add(float64(len(value))) - return tokens + if len(value) > maxTokenSize { + if !t.partialIndexing { + metric.SkippedIndexesPath.Inc() + metric.SkippedIndexesBytesPath.Add(float64(len(value))) + return tokens + } + metric.SkippedIndexesBytesPath.Add(float64(len(value) - maxTokenSize)) + value = value[:maxTokenSize] } - maxLength := min(len(value), maxTokenSize) - metric.SkippedIndexesBytesPath.Add(float64(len(value[maxLength:]))) - value = value[:maxLength] - var i int if len(value) != 0 && value[0] == t.separator { diff --git a/tokenizer/text_tokenizer.go b/tokenizer/text_tokenizer.go index 6a728a91..20682c5b 100644 --- a/tokenizer/text_tokenizer.go +++ b/tokenizer/text_tokenizer.go @@ -30,23 +30,22 @@ func (t *TextTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxFiel maxFieldValueLength = t.defaultMaxFieldValueLength } - if len(value) > maxFieldValueLength && !t.partialIndexing { - metric.SkippedIndexesText.Inc() - metric.SkippedIndexesBytesText.Add(float64(len(value))) - return tokens - } - if len(value) == 0 { tokens = append(tokens, MetaToken{Key: name, Value: value}) return tokens } - maxLength := min(len(value), maxFieldValueLength) + if len(value) > maxFieldValueLength { + if !t.partialIndexing { + metric.SkippedIndexesText.Inc() + metric.SkippedIndexesBytesText.Add(float64(len(value))) + return tokens + } + metric.SkippedIndexesBytesText.Add(float64(len(value) - maxFieldValueLength)) + value = value[:maxFieldValueLength] + } - metric.SkippedIndexesBytesText.Add(float64(len(value[maxLength:]))) - value = value[:maxLength] k := 0 - hasUpper := false asciiOnly := true // Loop over the string looking for tokens.