Skip to content

Commit 48238f6

Browse files
committed
update scroll api adding explicit UsePIT scroll strategy (taking into account elastic version), update ElasticResponse aggregations map
1 parent 9ce1980 commit 48238f6

File tree

48 files changed

+14351
-331
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+14351
-331
lines changed

core/src/main/scala/app/softnetwork/elastic/client/AggregateApi.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package app.softnetwork.elastic.client
1818

1919
import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}
20-
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLQuery}
20+
import app.softnetwork.elastic.sql.query.SQLQuery
2121

2222
import java.time.temporal.Temporal
2323
import scala.annotation.tailrec
@@ -65,14 +65,12 @@ trait SingleValueAggregateApi
6565
@tailrec
6666
def findAggregation(
6767
name: String,
68-
aggregation: SQLAggregation,
6968
results: Map[String, Any]
7069
): Option[Any] = {
7170
name.split("\\.") match {
7271
case Array(_, tail @ _*) if tail.nonEmpty =>
7372
findAggregation(
7473
tail.mkString("."),
75-
aggregation,
7674
results
7775
)
7876
case _ => results.get(name)
@@ -142,7 +140,7 @@ trait SingleValueAggregateApi
142140
response.aggregations.map { case (name, aggregation) =>
143141
// Attempt to process each aggregation
144142
val aggregationResult = ElasticResult.attempt {
145-
val value = findAggregation(name, aggregation, result).orNull match {
143+
val value = findAggregation(name, result).orNull match {
146144
case b: Boolean => BooleanValue(b)
147145
case n: Number => NumericValue(n)
148146
case s: String => StringValue(s)

core/src/main/scala/app/softnetwork/elastic/client/AggregateResult.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package app.softnetwork.elastic.client
1818

19-
import app.softnetwork.elastic.sql.function.aggregate.AggregateFunction
20-
2119
import java.time.temporal.Temporal
2220
import scala.util.{Failure, Success, Try}
2321

@@ -27,7 +25,7 @@ sealed trait AggregateResult {
2725
}
2826

2927
sealed trait MetricAgregateResult extends AggregateResult {
30-
def function: AggregateFunction
28+
def aggType: AggregationType.AggregationType
3129
}
3230

3331
sealed trait AggregateValue
@@ -51,7 +49,7 @@ case object EmptyValue extends AggregateValue
5149

5250
case class SingleValueAggregateResult(
5351
field: String,
54-
function: AggregateFunction,
52+
aggType: AggregationType.AggregationType,
5553
value: AggregateValue,
5654
error: Option[String] = None
5755
) extends MetricAgregateResult {
@@ -128,7 +126,7 @@ case class SingleValueAggregateResult(
128126
// Pretty print for debugging
129127
def prettyPrint: String = {
130128
val errorMsg = error.map(e => s" [ERROR: $e]").getOrElse("")
131-
s"$function($field) = ${formatValue(value)}$errorMsg"
129+
s"$aggType($field) = ${formatValue(value)}$errorMsg"
132130
}
133131

134132
private def formatValue(v: AggregateValue): String = v match {

core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
5454
* : documents to index
5555
* @param toDocument
5656
* : JSON transformation function
57+
* @param indexKey
58+
* : key for the index field
5759
* @param idKey
5860
* : key for the id field
5961
* @param suffixDateKey
@@ -76,6 +78,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
7678
def bulkWithResult[D](
7779
items: Iterator[D],
7880
toDocument: D => String,
81+
indexKey: Option[String] = None,
7982
idKey: Option[String] = None,
8083
suffixDateKey: Option[String] = None,
8184
suffixDatePattern: Option[String] = None,
@@ -94,6 +97,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
9497
bulkSource(
9598
items,
9699
toDocument,
100+
indexKey,
97101
idKey,
98102
suffixDateKey,
99103
suffixDatePattern,
@@ -156,6 +160,8 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
156160
* the documents to index
157161
* @param toDocument
158162
* JSON transformation function
163+
* @param indexKey
164+
* key for the index field
159165
* @param idKey
160166
* key for the id field
161167
* @param suffixDateKey
@@ -177,6 +183,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
177183
def bulkSource[D](
178184
items: Iterator[D],
179185
toDocument: D => String,
186+
indexKey: Option[String] = None,
180187
idKey: Option[String] = None,
181188
suffixDateKey: Option[String] = None,
182189
suffixDatePattern: Option[String] = None,
@@ -199,6 +206,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
199206
.map { item =>
200207
toBulkItem(
201208
toDocument,
209+
indexKey,
202210
idKey,
203211
suffixDateKey,
204212
suffixDatePattern,
@@ -260,6 +268,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
260268
def bulk[D](
261269
items: Iterator[D],
262270
toDocument: D => String,
271+
indexKey: Option[String] = None,
263272
idKey: Option[String] = None,
264273
suffixDateKey: Option[String] = None,
265274
suffixDatePattern: Option[String] = None,
@@ -274,6 +283,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
274283
bulkWithResult(
275284
items,
276285
toDocument,
286+
indexKey,
277287
idKey,
278288
suffixDateKey,
279289
suffixDatePattern,
@@ -453,6 +463,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
453463

454464
private def toBulkItem[D](
455465
toDocument: D => String,
466+
indexKey: Option[String],
456467
idKey: Option[String],
457468
suffixDateKey: Option[String],
458469
suffixDatePattern: Option[String],
@@ -471,6 +482,11 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
471482
}
472483

473484
// extract final index name
485+
val indexFromKeyOrOptions = indexKey
486+
.flatMap { i =>
487+
jsonMap.get(i).map(_.toString)
488+
}
489+
.getOrElse(bulkOptions.defaultIndex)
474490
val index = suffixDateKey
475491
.flatMap { s =>
476492
jsonMap.get(s).map { d =>
@@ -483,8 +499,8 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
483499
)
484500
}
485501
}
486-
.map(s => s"${bulkOptions.index}-$s")
487-
.getOrElse(bulkOptions.index)
502+
.map(s => s"$indexFromKeyOrOptions-$s")
503+
.getOrElse(indexFromKeyOrOptions)
488504

489505
// extract parent key
490506
val parent = parentIdKey.flatMap { i =>

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,15 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
11251125
delegate.searchAfter(elasticQuery, fieldAliases, config, hasSorts)
11261126
}
11271127

1128+
override private[client] def pitSearchAfter(
1129+
elasticQuery: ElasticQuery,
1130+
fieldAliases: Map[JSONResults, JSONResults],
1131+
config: ScrollConfig,
1132+
hasSorts: Boolean
1133+
)(implicit system: ActorSystem) = {
1134+
delegate.pitSearchAfter(elasticQuery, fieldAliases, config, hasSorts)
1135+
}
1136+
11281137
// ==================== BulkApi ====================
11291138

11301139
/** Bulk with detailed results (successes + failures).
@@ -1140,6 +1149,8 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
11401149
* : documents to index
11411150
* @param toDocument
11421151
* : JSON transformation function
1152+
* @param indexKey
1153+
* : key for the index field
11431154
* @param idKey
11441155
* : key for the id field
11451156
* @param suffixDateKey
@@ -1162,6 +1173,7 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
11621173
override def bulkWithResult[D](
11631174
items: Iterator[D],
11641175
toDocument: D => String,
1176+
indexKey: Option[String],
11651177
idKey: Option[String],
11661178
suffixDateKey: Option[String],
11671179
suffixDatePattern: Option[String],
@@ -1173,6 +1185,7 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
11731185
delegate.bulkWithResult(
11741186
items,
11751187
toDocument,
1188+
indexKey,
11761189
idKey,
11771190
suffixDateKey,
11781191
suffixDatePattern,
@@ -1200,6 +1213,8 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
12001213
* the documents to index
12011214
* @param toDocument
12021215
* JSON transformation function
1216+
* @param indexKey
1217+
* key for the index field
12031218
* @param idKey
12041219
* key for the id field
12051220
* @param suffixDateKey
@@ -1220,6 +1235,7 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
12201235
override def bulkSource[D](
12211236
items: Iterator[D],
12221237
toDocument: D => String,
1238+
indexKey: Option[String],
12231239
idKey: Option[String],
12241240
suffixDateKey: Option[String],
12251241
suffixDatePattern: Option[String],
@@ -1232,6 +1248,7 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
12321248
): Source[Either[FailedDocument, SuccessfulDocument], NotUsed] = delegate.bulkSource(
12331249
items,
12341250
toDocument,
1251+
indexKey,
12351252
idKey,
12361253
suffixDateKey,
12371254
suffixDatePattern,
@@ -1248,14 +1265,25 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
12481265
override def bulk[D](
12491266
items: Iterator[D],
12501267
toDocument: D => String,
1268+
indexKey: Option[String],
12511269
idKey: Option[String],
12521270
suffixDateKey: Option[String],
12531271
suffixDatePattern: Option[String],
12541272
update: Option[Boolean],
12551273
delete: Option[Boolean],
12561274
parentIdKey: Option[String]
12571275
)(implicit bulkOptions: BulkOptions, system: ActorSystem): ElasticResult[BulkResult] = delegate
1258-
.bulk(items, toDocument, idKey, suffixDateKey, suffixDatePattern, update, delete, parentIdKey)
1276+
.bulk(
1277+
items,
1278+
toDocument,
1279+
indexKey,
1280+
idKey,
1281+
suffixDateKey,
1282+
suffixDatePattern,
1283+
update,
1284+
delete,
1285+
parentIdKey
1286+
)
12591287

12601288
override private[client] def toBulkAction(bulkItem: BulkItem): BulkActionType =
12611289
delegate.toBulkAction(bulkItem).asInstanceOf[BulkActionType]

core/src/main/scala/app/softnetwork/elastic/client/ElasticConversion.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package app.softnetwork.elastic.client
1818

19-
import app.softnetwork.elastic.sql.query.SQLAggregation
2019
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
2120
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
2221
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
@@ -78,7 +77,7 @@ trait ElasticConversion {
7877
def parseMultiSearchResponse(
7978
jsonArray: JsonNode,
8079
fieldAliases: Map[String, String],
81-
aggregations: Map[String, SQLAggregation]
80+
aggregations: Map[String, ClientAggregation]
8281
): Try[Seq[Map[String, Any]]] =
8382
Try {
8483
val responses = jsonArray.elements().asScala.toList
@@ -112,7 +111,7 @@ trait ElasticConversion {
112111
def parseSingleSearchResponse(
113112
json: JsonNode,
114113
fieldAliases: Map[String, String],
115-
aggregations: Map[String, SQLAggregation]
114+
aggregations: Map[String, ClientAggregation]
116115
): Try[Seq[Map[String, Any]]] =
117116
Try {
118117
// check if it is an error response
@@ -131,7 +130,7 @@ trait ElasticConversion {
131130
def jsonToRows(
132131
json: JsonNode,
133132
fieldAliases: Map[String, String],
134-
aggregations: Map[String, SQLAggregation]
133+
aggregations: Map[String, ClientAggregation]
135134
): Seq[Map[String, Any]] = {
136135
val hitsNode = Option(json.path("hits").path("hits"))
137136
.filter(_.isArray)
@@ -255,7 +254,7 @@ trait ElasticConversion {
255254
aggsNode: JsonNode,
256255
parentContext: Map[String, Any],
257256
fieldAliases: Map[String, String],
258-
aggregations: Map[String, SQLAggregation]
257+
aggregations: Map[String, ClientAggregation]
259258
): Seq[Map[String, Any]] = {
260259

261260
if (aggsNode.isMissingNode || !aggsNode.isObject) {

core/src/main/scala/app/softnetwork/elastic/client/GetApi.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@ trait GetApi extends ElasticClientHelpers { _: SerializationApi =>
3232
// PUBLIC METHODS
3333
// ========================================================================
3434

35+
/** Check if a document exists by its id in the given index.
36+
* @param id
37+
* - the id of the document to check
38+
* @param index
39+
* - the name of the index to check the document in
40+
* @return
41+
* true if the document exists, false otherwise
42+
*/
43+
def exists(id: String, index: String): ElasticResult[Boolean] = {
44+
get(id, index).map {
45+
case Some(_) => true
46+
case None => false
47+
}
48+
}
49+
3550
/** Get a document by its id from the given index.
3651
* @param id
3752
* - the id of the document to get

core/src/main/scala/app/softnetwork/elastic/client/ScrollApi.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import app.softnetwork.elastic.client.scroll.{
2323
ScrollConfig,
2424
ScrollMetrics,
2525
ScrollStrategy,
26+
UsePIT,
2627
UseScroll,
2728
UseSearchAfter
2829
}
@@ -104,7 +105,7 @@ import scala.util.{Failure, Success}
104105
* [[https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html PIT API Documentation]]
105106
*/
106107
trait ScrollApi extends ElasticClientHelpers {
107-
_: SearchApi =>
108+
_: VersionApi with SearchApi =>
108109

109110
// ========================================================================
110111
// MAIN SCROLL METHODS
@@ -159,6 +160,13 @@ trait ScrollApi extends ElasticClientHelpers {
159160
hasSorts: Boolean = false
160161
)(implicit system: ActorSystem): Source[Map[String, Any], NotUsed]
161162

163+
private[client] def pitSearchAfter(
164+
elasticQuery: ElasticQuery,
165+
fieldAliases: Map[String, String],
166+
config: ScrollConfig,
167+
hasSorts: Boolean = false
168+
)(implicit system: ActorSystem): Source[Map[String, Any], NotUsed]
169+
162170
/** Typed scroll source
163171
*/
164172
def scrollAs[T](
@@ -192,7 +200,19 @@ trait ScrollApi extends ElasticClientHelpers {
192200
if (hasAggregations(elasticQuery.query)) {
193201
UseScroll
194202
} else {
195-
UseSearchAfter
203+
// Detect version and choose implementation
204+
version match {
205+
case result.ElasticSuccess(v) =>
206+
if (ElasticsearchVersion.supportsPit(v)) {
207+
logger.info(s"ES version $v supports PIT, using pitSearchAfterSource")
208+
UsePIT
209+
} else {
210+
logger.info(s"ES version $v does not support PIT, using classic search_after")
211+
UseSearchAfter
212+
}
213+
case result.ElasticFailure(err) =>
214+
throw new RuntimeException(s"Failed to get ES version: $err")
215+
}
196216
}
197217
}
198218
}
@@ -285,6 +305,10 @@ trait ScrollApi extends ElasticClientHelpers {
285305
logger.info("Using search_after (optimized for hits only)")
286306
searchAfter(elasticQuery, fieldAliases, config, hasSorts)
287307

308+
case UsePIT =>
309+
logger.info("Using PIT + search_after (optimized for hits only)")
310+
pitSearchAfter(elasticQuery, fieldAliases, config, hasSorts)
311+
288312
case _ =>
289313
logger.info("Falling back to classic scroll")
290314
scrollClassic(elasticQuery, fieldAliases, aggregations, config)

0 commit comments

Comments
 (0)