|
37 | 37 | import org.elasticsearch.TransportVersion; |
38 | 38 | import org.elasticsearch.TransportVersions; |
39 | 39 | import org.elasticsearch.action.get.GetRequest; |
| 40 | +import org.elasticsearch.common.CheckedSupplier; |
40 | 41 | import org.elasticsearch.common.bytes.BytesReference; |
41 | 42 | import org.elasticsearch.common.io.stream.InputStreamStreamInput; |
42 | 43 | import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; |
|
63 | 64 | import org.elasticsearch.index.query.SearchExecutionContext; |
64 | 65 | import org.elasticsearch.indices.breaker.CircuitBreakerService; |
65 | 66 | import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; |
| 67 | +import org.elasticsearch.logging.LogManager; |
| 68 | +import org.elasticsearch.logging.Logger; |
| 69 | +import org.elasticsearch.search.lookup.Source; |
| 70 | +import org.elasticsearch.search.lookup.SourceProvider; |
66 | 71 | import org.elasticsearch.xcontent.ConstructingObjectParser; |
67 | 72 | import org.elasticsearch.xcontent.NamedXContentRegistry; |
68 | 73 | import org.elasticsearch.xcontent.ParseField; |
|
86 | 91 | import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; |
87 | 92 |
|
88 | 93 | public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBuilder> { |
| 94 | + private static final Logger LOGGER = LogManager.getLogger(PercolateQueryBuilder.class); |
| 95 | + |
89 | 96 | public static final String NAME = "percolate"; |
90 | 97 |
|
91 | 98 | static final ParseField DOCUMENT_FIELD = new ParseField("document"); |
@@ -557,41 +564,81 @@ static PercolateQuery.QueryStore createStore(MappedFieldType queryBuilderFieldTy |
557 | 564 | return docId -> { |
558 | 565 | if (binaryDocValues.advanceExact(docId)) { |
559 | 566 | BytesRef qbSource = binaryDocValues.binaryValue(); |
560 | | - try ( |
561 | | - InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length); |
562 | | - StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, qbSource.length), registry) |
563 | | - ) { |
564 | | - // Query builder's content is stored via BinaryFieldMapper, which has a custom encoding |
565 | | - // to encode multiple binary values into a single binary doc values field. |
566 | | - // This is the reason we need to first read the number of values and |
567 | | - // then the length of the field value in bytes. |
568 | | - int numValues = input.readVInt(); |
569 | | - assert numValues == 1; |
570 | | - int valueLength = input.readVInt(); |
571 | | - assert valueLength > 0; |
572 | | - |
573 | | - TransportVersion transportVersion; |
574 | | - if (indexVersion.before(IndexVersions.V_8_8_0)) { |
575 | | - transportVersion = TransportVersion.fromId(indexVersion.id()); |
576 | | - } else { |
577 | | - transportVersion = TransportVersion.readVersion(input); |
| 567 | + QueryBuilder queryBuilder = readQueryBuilder(qbSource, registry, indexVersion, () -> { |
| 568 | + // query builder is written in an incompatible format, fall-back to reading it from source |
| 569 | + if (context.isSourceEnabled() == false) { |
| 570 | + throw new ElasticsearchException( |
| 571 | + "Unable to read percolator query. Original transport version is incompatible and source is " |
| 572 | + + "unavailable on index [{}].", |
| 573 | + context.index().getName() |
| 574 | + ); |
578 | 575 | } |
579 | | - // set the transportversion here - only read vints so far, so can change the version freely at this point |
580 | | - input.setTransportVersion(transportVersion); |
581 | | - |
582 | | - QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class); |
583 | | - assert in.read() == -1; |
584 | | - queryBuilder = Rewriteable.rewrite(queryBuilder, context); |
585 | | - return queryBuilder.toQuery(context); |
586 | | - } |
587 | | - |
| 576 | + LOGGER.warn( |
| 577 | + "Reading percolator query from source. For best performance, reindexing of index [{}] is required.", |
| 578 | + context.index().getName() |
| 579 | + ); |
| 580 | + SourceProvider sourceProvider = context.createSourceProvider(); |
| 581 | + Source source = sourceProvider.getSource(ctx, docId); |
| 582 | + SourceToParse sourceToParse = new SourceToParse( |
| 583 | + String.valueOf(docId), |
| 584 | + source.internalSourceRef(), |
| 585 | + source.sourceContentType() |
| 586 | + ); |
| 587 | + |
| 588 | + return context.parseDocument(sourceToParse).rootDoc().getBinaryValue(queryBuilderFieldType.name()); |
| 589 | + }); |
| 590 | + |
| 591 | + queryBuilder = Rewriteable.rewrite(queryBuilder, context); |
| 592 | + return queryBuilder.toQuery(context); |
588 | 593 | } else { |
589 | 594 | return null; |
590 | 595 | } |
591 | 596 | }; |
592 | 597 | }; |
593 | 598 | } |
594 | 599 |
|
| 600 | + private static QueryBuilder readQueryBuilder( |
| 601 | + BytesRef bytesRef, |
| 602 | + NamedWriteableRegistry registry, |
| 603 | + IndexVersion indexVersion, |
| 604 | + CheckedSupplier<BytesRef, IOException> fallbackSource |
| 605 | + ) throws IOException { |
| 606 | + try ( |
| 607 | + InputStream in = new ByteArrayInputStream(bytesRef.bytes, bytesRef.offset, bytesRef.length); |
| 608 | + StreamInput input = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(in, bytesRef.length), registry) |
| 609 | + ) { |
| 610 | + // Query builder's content is stored via BinaryFieldMapper, which has a custom encoding |
| 611 | + // to encode multiple binary values into a single binary doc values field. |
| 612 | + // This is the reason we need to first read the number of values and |
| 613 | + // then the length of the field value in bytes. |
| 614 | + int numValues = input.readVInt(); |
| 615 | + assert numValues == 1; |
| 616 | + int valueLength = input.readVInt(); |
| 617 | + assert valueLength > 0; |
| 618 | + |
| 619 | + TransportVersion transportVersion; |
| 620 | + if (indexVersion.before(IndexVersions.V_8_8_0)) { |
| 621 | + transportVersion = TransportVersion.fromId(indexVersion.id()); |
| 622 | + } else { |
| 623 | + transportVersion = TransportVersion.readVersion(input); |
| 624 | + } |
| 625 | + |
| 626 | + QueryBuilder queryBuilder; |
| 627 | + |
| 628 | + if (TransportVersion.isCompatible(transportVersion) || fallbackSource == null) { |
| 629 | + // set the transportversion here - only read vints so far, so can change the version freely at this point |
| 630 | + input.setTransportVersion(transportVersion); |
| 631 | + queryBuilder = input.readNamedWriteable(QueryBuilder.class); |
| 632 | + assert in.read() == -1; |
| 633 | + } else { |
| 634 | + // incompatible transport version, try the fallback |
| 635 | + queryBuilder = readQueryBuilder(fallbackSource.get(), registry, indexVersion, null); |
| 636 | + } |
| 637 | + |
| 638 | + return queryBuilder; |
| 639 | + } |
| 640 | + } |
| 641 | + |
595 | 642 | static SearchExecutionContext wrap(SearchExecutionContext delegate) { |
596 | 643 | return new SearchExecutionContext(delegate) { |
597 | 644 |
|
|
0 commit comments