Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ public static DoubleStream randomDoubles(long streamSize) {
return random().doubles(streamSize);
}

/**
* Returns a pseudo-random double from a Gaussian distribution with mean 0.0 and standard deviation 1.0
*/
public static double randomGaussianDouble() {
return random().nextGaussian();
}

/**
* Returns a double value in the interval [start, end) if lowerInclusive is
* set to true, (start, end) otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -67,6 +68,10 @@ public class TDigestFieldMapper extends FieldMapper {

public static final String CENTROIDS_NAME = "centroids";
public static final String COUNTS_NAME = "counts";
public static final String SUM_FIELD_NAME = "sum";
public static final String TOTAL_COUNT_FIELD_NAME = "count";
public static final String MIN_FIELD_NAME = "min";
public static final String MAX_FIELD_NAME = "max";
public static final String CONTENT_TYPE = "tdigest";

private static TDigestFieldMapper toType(FieldMapper in) {
Expand Down Expand Up @@ -202,7 +207,7 @@ public LeafHistogramFieldData load(LeafReaderContext context) {
public HistogramValues getHistogramValues() throws IOException {
try {
final BinaryDocValues values = DocValues.getBinary(context.reader(), fieldName);
final InternalHistogramValue value = new InternalHistogramValue();
final InternalTDigestValue value = new InternalTDigestValue();
return new HistogramValues() {

@Override
Expand Down Expand Up @@ -234,7 +239,7 @@ public DocValuesScriptFieldFactory getScriptFieldFactory(String name) {
public FormattedDocValues getFormattedValues(DocValueFormat format) {
try {
final BinaryDocValues values = DocValues.getBinary(context.reader(), fieldName);
final InternalHistogramValue value = new InternalHistogramValue();
final InternalTDigestValue value = new InternalTDigestValue();
return new FormattedDocValues() {
@Override
public boolean advanceExact(int docId) throws IOException {
Expand Down Expand Up @@ -337,20 +342,44 @@ public void parse(DocumentParserContext context) throws IOException {
}
subParser.nextToken();
// TODO: Here we should build a t-digest out of the input, based on the settings on the field
TDigestParser.ParsedHistogram parsedHistogram = TDigestParser.parse(fullPath(), subParser);
TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(fullPath(), subParser);

BytesStreamOutput streamOutput = new BytesStreamOutput();
for (int i = 0; i < parsedHistogram.centroids().size(); i++) {
long count = parsedHistogram.counts().get(i);

for (int i = 0; i < parsedTDigest.centroids().size(); i++) {
long count = parsedTDigest.counts().get(i);
assert count >= 0;
// we do not add elements with count == 0
if (count > 0) {
streamOutput.writeVLong(count);
streamOutput.writeLong(Double.doubleToRawLongBits(parsedHistogram.centroids().get(i)));
streamOutput.writeDouble(parsedTDigest.centroids().get(i));
}
}

BytesRef docValue = streamOutput.bytes().toBytesRef();
Field field = new BinaryDocValuesField(fullPath(), docValue);
Field digestField = new BinaryDocValuesField(fullPath(), docValue);

// Add numeric doc values fields for the summary data
NumericDocValuesField maxField = null;
if (Double.isNaN(parsedTDigest.max()) == false) {
maxField = new NumericDocValuesField(
valuesMaxSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(parsedTDigest.max())
);
}

NumericDocValuesField minField = null;
if (Double.isNaN(parsedTDigest.min()) == false) {
minField = new NumericDocValuesField(
valuesMinSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(parsedTDigest.min())
);
}
NumericDocValuesField countField = new NumericDocValuesField(valuesCountSubFieldName(fullPath()), parsedTDigest.count());
NumericDocValuesField sumField = new NumericDocValuesField(
valuesSumSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(parsedTDigest.sum())
);
if (context.doc().getByKey(fieldType().name()) != null) {
throw new IllegalArgumentException(
"Field ["
Expand All @@ -360,7 +389,15 @@ public void parse(DocumentParserContext context) throws IOException {
+ "] doesn't support indexing multiple values for the same field in the same document"
);
}
context.doc().addWithKey(fieldType().name(), field);
context.doc().addWithKey(fieldType().name(), digestField);
context.doc().add(countField);
context.doc().add(sumField);
if (maxField != null) {
context.doc().add(maxField);
}
if (minField != null) {
context.doc().add(minField);
}

} catch (Exception ex) {
if (ignoreMalformed.value() == false) {
Expand Down Expand Up @@ -390,19 +427,36 @@ public void parse(DocumentParserContext context) throws IOException {
context.path().remove();
}

private static String valuesCountSubFieldName(String fullPath) {
return fullPath + "._values_count";
}

private static String valuesSumSubFieldName(String fullPath) {
return fullPath + "._values_sum";
}

private static String valuesMinSubFieldName(String fullPath) {
return fullPath + "._values_min";
}

private static String valuesMaxSubFieldName(String fullPath) {
return fullPath + "._values_max";
}

/** re-usable {@link HistogramValue} implementation */
private static class InternalHistogramValue extends HistogramValue {
private static class InternalTDigestValue extends HistogramValue {
double value;
long count;
boolean isExhausted;

final ByteArrayStreamInput streamInput;

InternalHistogramValue() {
InternalTDigestValue() {
streamInput = new ByteArrayStreamInput();
}

/** reset the value for the histogram */
void reset(BytesRef bytesRef) {
void reset(BytesRef bytesRef) throws IOException {
streamInput.reset(bytesRef.bytes, bytesRef.offset, bytesRef.length);
isExhausted = false;
value = 0;
Expand All @@ -412,12 +466,8 @@ void reset(BytesRef bytesRef) {
@Override
public boolean next() throws IOException {
if (streamInput.available() > 0) {
if (streamInput.getTransportVersion().onOrAfter(TransportVersions.V_8_11_X)) {
count = streamInput.readVLong();
} else {
count = streamInput.readVInt();
}
value = Double.longBitsToDouble(streamInput.readLong());
count = streamInput.readVLong();
value = streamInput.readDouble();
return true;
}
isExhausted = true;
Expand Down Expand Up @@ -447,14 +497,14 @@ protected SyntheticSourceSupport syntheticSourceSupport() {
() -> new CompositeSyntheticFieldLoader(
leafName(),
fullPath(),
new HistogramSyntheticFieldLoader(),
new TDigestSyntheticFieldLoader(),
new CompositeSyntheticFieldLoader.MalformedValuesLayer(fullPath())
)
);
}

private class HistogramSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer {
private final InternalHistogramValue value = new InternalHistogramValue();
private class TDigestSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer {
private final InternalTDigestValue value = new InternalTDigestValue();
private BytesRef binaryValue;

@Override
Expand Down Expand Up @@ -485,9 +535,10 @@ public void write(XContentBuilder b) throws IOException {
if (binaryValue == null) {
return;
}
b.startObject();

value.reset(binaryValue);

b.startObject();
// TODO: Load the summary values out of the sub-fields, if they exist
b.startArray(CENTROIDS_NAME);
while (value.next()) {
b.value(value.value());
Expand Down
Loading