Skip to content

Commit 66808e3

Browse files
authored
Merge pull request #20 from SOFTNETWORK-APP/fix/nestedAndAggs
fix nested and aggregations
2 parents 900773d + 899a080 commit 66808e3

File tree

39 files changed

+3845
-1981
lines changed

39 files changed

+3845
-1981
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"
1919

2020
name := "softclient4es"
2121

22-
ThisBuild / version := "0.9.1"
22+
ThisBuild / version := "0.9.2"
2323

2424
ThisBuild / scalaVersion := scala213
2525

core/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,8 +888,9 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
888888
| p.birthDate,
889889
| p.children,
890890
| inner_children.name,
891-
| inner_children.birthDate
892-
|FROM
891+
| inner_children.birthDate,
892+
| inner_children.parentId
893+
| FROM
893894
| parent as p
894895
| JOIN UNNEST(p.children) as inner_children
895896
|WHERE

documentation/functions_aggregate.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ FROM emp;
149149
Collect values into an array for each partition. Implemented using `OVER` and pushed to ES as `top_hits`. Post-processing converts hits to an array of scalars.
150150

151151
**Inputs:**
152-
- `expr` with optional `OVER (PARTITION BY ... ORDER BY ... LIMIT n)`
152+
- `expr` with optional `OVER (PARTITION BY ... ORDER BY ... )`
153153
If `OVER` is not provided, only the expr column name is used for the sorting.
154154

155155
**Output:**
@@ -161,9 +161,9 @@ SELECT department,
161161
ARRAY_AGG(name) OVER (
162162
PARTITION BY department
163163
ORDER BY hire_date ASC
164-
LIMIT 100
165164
) AS employees
166-
FROM emp;
165+
FROM emp
166+
LIMIT 100;
167167
-- Result: employees as an array of name values
168168
-- per department (sorted and limited)
169169
```

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 80 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package app.softnetwork.elastic.sql.bridge
33
import app.softnetwork.elastic.sql.query.{
44
Asc,
55
Bucket,
6-
BucketSelectorScript,
6+
BucketIncludesExcludes,
77
Criteria,
88
Desc,
9-
ElasticBoolQuery,
109
Field,
10+
MetricSelectorScript,
11+
NestedElement,
12+
NestedElements,
1113
SortOrder
1214
}
1315
import app.softnetwork.elastic.sql.function._
@@ -16,7 +18,6 @@ import com.sksamuel.elastic4s.ElasticApi.{
1618
avgAgg,
1719
bucketSelectorAggregation,
1820
cardinalityAgg,
19-
filterAgg,
2021
maxAgg,
2122
minAgg,
2223
nestedAggregation,
@@ -48,9 +49,10 @@ case class ElasticAggregation(
4849
filteredAgg: Option[FilterAggregation] = None,
4950
aggType: AggregateFunction,
5051
agg: Aggregation,
51-
direction: Option[SortOrder] = None
52+
direction: Option[SortOrder] = None,
53+
nestedElement: Option[NestedElement] = None
5254
) {
53-
val nested: Boolean = nestedAgg.nonEmpty
55+
val nested: Boolean = nestedElement.nonEmpty
5456
val filtered: Boolean = filteredAgg.nonEmpty
5557
}
5658

@@ -61,7 +63,7 @@ object ElasticAggregation {
6163
bucketsDirection: Map[String, SortOrder]
6264
): ElasticAggregation = {
6365
import sqlAgg._
64-
val sourceField = identifier.name
66+
val sourceField = identifier.path
6567

6668
val direction = bucketsDirection.get(identifier.identifierName)
6769

@@ -103,7 +105,7 @@ object ElasticAggregation {
103105
buildScript: (String, Script) => Aggregation
104106
): Aggregation = {
105107
if (transformFuncs.nonEmpty) {
106-
val scriptSrc = identifier.painless
108+
val scriptSrc = identifier.painless()
107109
val script = Script(scriptSrc).lang("painless")
108110
buildScript(aggName, script)
109111
} else {
@@ -143,7 +145,7 @@ object ElasticAggregation {
143145
.copy(
144146
scripts = th.fields
145147
.filter(_.isScriptField)
146-
.map(f => f.sourceField -> Script(f.painless).lang("painless"))
148+
.map(f => f.sourceField -> Script(f.painless()).lang("painless"))
147149
.toMap
148150
)
149151
.size(limit) sortBy th.orderBy.sorts.map(sort =>
@@ -160,60 +162,69 @@ object ElasticAggregation {
160162
}
161163
}
162164
)
163-
/*th.fields.filter(_.isScriptField).foldLeft(topHits) { (agg, f) =>
164-
agg.script(f.sourceField, Script(f.painless, lang = Some("painless")))
165-
}*/
166165
topHits
167166
}
168167

169168
val filteredAggName = "filtered_agg"
170169

171-
val filteredAgg: Option[FilterAggregation] =
172-
having match {
173-
case Some(f) =>
174-
val boolQuery = Option(ElasticBoolQuery(group = true))
175-
Some(
176-
filterAgg(
177-
filteredAggName,
178-
f.asFilter(boolQuery)
179-
.query(Set(identifier.innerHitsName).flatten, boolQuery)
180-
)
181-
)
182-
case _ =>
183-
None
184-
}
185-
186170
def filtered(): Unit =
187-
filteredAgg match {
171+
having match {
188172
case Some(_) =>
189173
aggPath ++= Seq(filteredAggName)
190174
aggPath ++= Seq(aggName)
191175
case _ =>
192176
aggPath ++= Seq(aggName)
193177
}
194178

179+
val nestedElement = identifier.nestedElement
180+
181+
val nestedElements: Seq[NestedElement] =
182+
nestedElement.map(n => NestedElements.buildNestedTrees(Seq(n))).getOrElse(Nil)
183+
195184
val nestedAgg =
196-
if (identifier.nested) {
197-
val path = sourceField.split("\\.").head
198-
val nestedAgg = s"nested_${identifier.nestedType.getOrElse(aggName)}"
199-
aggPath ++= Seq(nestedAgg)
200-
filtered()
201-
Some(nestedAggregation(nestedAgg, path))
202-
} else {
203-
filtered()
204-
None
185+
nestedElements match {
186+
case Nil =>
187+
None
188+
case nestedElements =>
189+
def buildNested(n: NestedElement): NestedAggregation = {
190+
aggPath ++= Seq(n.innerHitsName)
191+
val children = n.children
192+
if (children.nonEmpty) {
193+
val innerAggs = children.map(buildNested)
194+
val combinedAgg = if (innerAggs.size == 1) {
195+
innerAggs.head
196+
} else {
197+
innerAggs.reduceLeft { (agg1, agg2) =>
198+
agg1.copy(subaggs = agg1.subaggs ++ Seq(agg2))
199+
}
200+
}
201+
nestedAggregation(
202+
n.innerHitsName,
203+
n.path
204+
) subaggs Seq(combinedAgg)
205+
} else {
206+
nestedAggregation(
207+
n.innerHitsName,
208+
n.path
209+
)
210+
}
211+
}
212+
213+
Some(buildNested(nestedElements.head))
205214
}
206215

216+
filtered()
217+
207218
ElasticAggregation(
208219
aggPath.mkString("."),
209220
field,
210221
sourceField,
211222
distinct = distinct,
212223
nestedAgg = nestedAgg,
213-
filteredAgg = filteredAgg,
214224
aggType = aggType,
215225
agg = _agg,
216-
direction = direction
226+
direction = direction,
227+
nestedElement = nestedElement
217228
)
218229
}
219230

@@ -224,20 +235,38 @@ object ElasticAggregation {
224235
aggregationsDirection: Map[String, SortOrder],
225236
having: Option[Criteria]
226237
): Option[TermsAggregation] = {
227-
Console.println(bucketsDirection)
228238
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
229-
val agg = {
239+
var agg = {
230240
bucketsDirection.get(bucket.identifier.identifierName) match {
231241
case Some(direction) =>
232-
termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
242+
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
233243
.order(Seq(direction match {
234-
case Asc => TermsOrder(bucket.name, asc = true)
235-
case _ => TermsOrder(bucket.name, asc = false)
244+
case Asc => TermsOrder("_key", asc = true)
245+
case _ => TermsOrder("_key", asc = false)
236246
}))
237247
case None =>
238-
termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
248+
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
239249
}
240250
}
251+
bucket.size.foreach(s => agg = agg.size(s))
252+
having match {
253+
case Some(criteria) =>
254+
criteria.includes(bucket, not = false, BucketIncludesExcludes()) match {
255+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
256+
agg = agg.include(regex)
257+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
258+
agg = agg.include(values.toArray)
259+
case _ =>
260+
}
261+
criteria.excludes(bucket, not = false, BucketIncludesExcludes()) match {
262+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
263+
agg = agg.exclude(regex)
264+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
265+
agg = agg.exclude(values.toArray)
266+
case _ =>
267+
}
268+
case _ =>
269+
}
241270
current match {
242271
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
243272
case None =>
@@ -254,12 +283,15 @@ object ElasticAggregation {
254283
agg
255284
val withHaving = having match {
256285
case Some(criteria) =>
257-
import BucketSelectorScript._
258-
val script = toPainless(criteria)
259-
val bucketsPath = extractBucketsPath(criteria)
286+
val script = MetricSelectorScript.metricSelector(criteria)
287+
val bucketsPath = criteria.extractMetricsPath
260288

261289
val bucketSelector =
262-
bucketSelectorAggregation("having_filter", Script(script), bucketsPath)
290+
bucketSelectorAggregation(
291+
"having_filter",
292+
Script(script.replaceAll("1 == 1 &&", "").replaceAll("&& 1 == 1", "").trim),
293+
bucketsPath
294+
)
263295

264296
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
265297

0 commit comments

Comments
 (0)