Skip to content

Commit d7f736e

Browse files
authored
Merge pull request #13 from SOFTNETWORK-APP/feature/groupByWithHaving
add SQL support for group by and having
2 parents 82fe1c0 + 2e2e449 commit d7f736e

File tree

23 files changed

+1047
-741
lines changed

23 files changed

+1047
-741
lines changed

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# ![SoftClient4ES Logo](https://raw.githubusercontent.com/SOFTNETWORK-APP/SoftClient4ES/main/logo.png)
2+
3+
4+
![Build Status](https://github.com/SOFTNETWORK-APP/SoftClient4ES/workflows/Build/badge.svg)
5+
[![codecov](https://codecov.io/gh/SOFTNETWORK-APP/SoftClient4ES/graph/badge.svg?token=XYCWBGVHAC)](https://codecov.io/gh/SOFTNETWORK-APP/SoftClient4ES)
6+
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/1c13d6eb7d6c4a1495cd47e457c132dc)](https://app.codacy.com/gh/SOFTNETWORK-APP/elastic/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
7+
[![License](https://img.shields.io/github/license/SOFTNETWORK-APP/elastic)](https://github.com/SOFTNETWORK-APP/elastic/blob/main/LICENSE)
8+
9+
**SoftClient4ES** is a modular and version-resilient interface built on top of Elasticsearch clients, providing a unified and stable API that simplifies migration across Elasticsearch versions, accelerates development, and offers advanced features for search, indexing, and data manipulation.
10+
11+
## Key Features
12+
13+
**Unified Elasticsearch API**
14+
This project provides a trait-based interface (`ElasticClientApi`) that aggregates the core functionalities of Elasticsearch: indexing, searching, updating, deleting, mapping, aliases, refreshing, and more. This design abstracts the underlying client implementation and ensures compatibility across different Elasticsearch versions.
15+
16+
- `JestClientApi`: For Elasticsearch 6 using the open-source [Jest client](https://github.com/searchbox-io/Jest).
17+
- `RestHighLevelClientApi`: For Elasticsearch 6 and 7 using the official high-level REST client.
18+
- `ElasticsearchClientApi`: For Elasticsearch 8 and 9 using the official Java client.
19+
20+
By relying on these concrete implementations, developers can switch between versions with minimal changes to their business logic.
21+
22+
**SQL to Elasticsearch Query Translation**
23+
Elastic Client includes a parser capable of translating SQL `SELECT` queries into Elasticsearch queries. The parser produces an intermediate representation, which is then converted into [Elastic4s](https://github.com/sksamuel/elastic4s) DSL queries and ultimately into native Elasticsearch queries. This allows data engineers and analysts to express queries in familiar SQL syntax.
24+
25+
**Dynamic Mapping Migration**
26+
Elastic Client provides tools to analyze and compare existing mappings with new ones. If differences are detected, it can automatically perform safe migrations. This includes creating temporary indices, reindexing, and renaming — all while preserving data integrity. This eliminates the need for manual mapping migrations and reduces downtime.
27+
28+
**High-Performance Bulk API with Akka Streams**
29+
Bulk operations leverage the power of Akka Streams to efficiently process and index large volumes of data. This stream-based approach improves performance, resilience, and backpressure handling, especially for real-time or high-throughput indexing scenarios.
30+
31+
**Akka Persistence Integration**
32+
The project offers seamless integration with Akka Persistence. This enables Elasticsearch indices to be updated reactively based on persistent events, offering a robust pattern for event-sourced systems.
33+
34+
## Roadmap
35+
36+
Future enhancements include expanding the SQL parser to support additional operations such as `INSERT`, `UPDATE`, and `DELETE`. The long-term vision is to deliver a fully functional, open-source **JDBC connector for Elasticsearch**, empowering users to interact with their data using standard SQL tooling.
37+
38+
## License
39+
40+
This project is open source and licensed under the Apache License 2.0.

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"
1919

2020
name := "softclient4es"
2121

22-
ThisBuild / version := "0.2.1"
22+
ThisBuild / version := "0.3.0"
2323

2424
ThisBuild / scalaVersion := scala213
2525

core/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,14 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
191191
case other => fail(other.toString)
192192
}
193193

194-
pClient.search[Person]("select * from person_mapping where match(name, 'gum')") match {
194+
pClient.search[Person]("select * from person_mapping where match (name) against ('gum')") match {
195195
case r if r.size == 1 =>
196196
r.map(_.uuid) should contain only "A16"
197197
case other => fail(other.toString)
198198
}
199199

200200
pClient.search[Person](
201-
"select * from person_mapping where uuid <> 'A16' and match(name, 'gum')"
201+
"select * from person_mapping where uuid <> 'A16' and match (name) against ('gum')"
202202
) match {
203203
case r if r.isEmpty =>
204204
case other => fail(other.toString)
@@ -239,7 +239,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
239239

240240
"person_migration" should haveCount(3)
241241

242-
pClient.search[Person]("select * from person_migration where match(name, 'gum')") match {
242+
pClient.search[Person]("select * from person_migration where match (name) against ('gum')") match {
243243
case r if r.isEmpty =>
244244
case other => fail(other.toString)
245245
}
@@ -288,7 +288,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
288288
pClient.shouldUpdateMapping("person_migration", newMapping) shouldBe true
289289
pClient.updateMapping("person_migration", newMapping) shouldBe true
290290

291-
pClient.search[Person]("select * from person_migration where match(name, 'gum')") match {
291+
pClient.search[Person]("select * from person_migration where match (name) against ('gum')") match {
292292
case r if r.size == 1 =>
293293
r.map(_.uuid) should contain only "A16"
294294
case other => fail(other.toString)

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 109 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,28 @@ import app.softnetwork.elastic.sql.{
77
ElasticBoolQuery,
88
Max,
99
Min,
10-
SQLAggregate,
10+
SQLBucket,
11+
SQLCriteria,
12+
SQLField,
1113
Sum
1214
}
1315
import com.sksamuel.elastic4s.ElasticApi.{
1416
avgAgg,
1517
cardinalityAgg,
1618
filterAgg,
17-
matchAllQuery,
1819
maxAgg,
1920
minAgg,
2021
nestedAggregation,
2122
sumAgg,
23+
termsAgg,
2224
valueCountAgg
2325
}
24-
import com.sksamuel.elastic4s.searches.aggs.Aggregation
26+
import com.sksamuel.elastic4s.searches.aggs.{
27+
Aggregation,
28+
FilterAggregation,
29+
NestedAggregation,
30+
TermsAggregation
31+
}
2532

2633
import scala.language.implicitConversions
2734

@@ -32,89 +39,141 @@ case class ElasticAggregation(
3239
sources: Seq[String] = Seq.empty,
3340
query: Option[String] = None,
3441
distinct: Boolean = false,
35-
nested: Boolean = false,
36-
filtered: Boolean = false,
42+
nestedAgg: Option[NestedAggregation] = None,
43+
filteredAgg: Option[FilterAggregation] = None,
3744
aggType: AggregateFunction,
3845
agg: Aggregation
39-
)
46+
) {
47+
val nested: Boolean = nestedAgg.nonEmpty
48+
val filtered: Boolean = filteredAgg.nonEmpty
49+
}
4050

4151
object ElasticAggregation {
42-
def apply(sqlAgg: SQLAggregate): ElasticAggregation = {
52+
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
4353
import sqlAgg._
44-
val sourceField = identifier.columnName
54+
val sourceField = identifier.name
4555

46-
val field = alias match {
56+
val field = fieldAlias match {
4757
case Some(alias) => alias.alias
4858
case _ => sourceField
4959
}
5060

51-
val distinct = identifier.distinct.isDefined
61+
val distinct = identifier.distinct
5262

53-
val agg =
54-
if (distinct)
55-
s"${function}_distinct_${sourceField.replace(".", "_")}"
63+
val aggType = aggregateFunction.getOrElse(
64+
throw new IllegalArgumentException("Aggregation function is required")
65+
)
66+
67+
val aggName = {
68+
if (fieldAlias.isDefined)
69+
field
70+
else if (distinct)
71+
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
5672
else
57-
s"${function}_${sourceField.replace(".", "_")}"
73+
s"${aggType}_${sourceField.replace(".", "_")}"
74+
}
5875

5976
var aggPath = Seq[String]()
6077

6178
val _agg =
62-
function match {
79+
aggType match {
6380
case Count =>
6481
if (distinct)
65-
cardinalityAgg(agg, sourceField)
82+
cardinalityAgg(aggName, sourceField)
6683
else {
67-
valueCountAgg(agg, sourceField)
84+
valueCountAgg(aggName, sourceField)
6885
}
69-
case Min => minAgg(agg, sourceField)
70-
case Max => maxAgg(agg, sourceField)
71-
case Avg => avgAgg(agg, sourceField)
72-
case Sum => sumAgg(agg, sourceField)
86+
case Min => minAgg(aggName, sourceField)
87+
case Max => maxAgg(aggName, sourceField)
88+
case Avg => avgAgg(aggName, sourceField)
89+
case Sum => sumAgg(aggName, sourceField)
7390
}
7491

75-
def _filtered: Aggregation = filter match {
76-
case Some(f) =>
77-
val boolQuery = Option(ElasticBoolQuery(group = true))
78-
val filteredAgg = s"filtered_agg"
79-
aggPath ++= Seq(filteredAgg)
80-
filterAgg(
81-
filteredAgg,
82-
f.criteria
83-
.map(
84-
_.asFilter(boolQuery)
92+
val filteredAggName = "filtered_agg"
93+
94+
val filteredAgg: Option[FilterAggregation] =
95+
filter match {
96+
case Some(f) =>
97+
val boolQuery = Option(ElasticBoolQuery(group = true))
98+
Some(
99+
filterAgg(
100+
filteredAggName,
101+
f.asFilter(boolQuery)
85102
.query(Set(identifier.innerHitsName).flatten, boolQuery)
86103
)
87-
.getOrElse(matchAllQuery())
88-
) subaggs {
89-
aggPath ++= Seq(agg)
90-
_agg
91-
}
92-
case _ =>
93-
aggPath ++= Seq(agg)
94-
_agg
95-
}
104+
)
105+
case _ =>
106+
None
107+
}
108+
109+
def filtered(): Unit =
110+
filteredAgg match {
111+
case Some(_) =>
112+
aggPath ++= Seq(filteredAggName)
113+
aggPath ++= Seq(aggName)
114+
case _ =>
115+
aggPath ++= Seq(aggName)
116+
}
96117

97-
val aggregation =
118+
val nestedAgg =
98119
if (identifier.nested) {
99120
val path = sourceField.split("\\.").head
100-
val nestedAgg = s"nested_$agg"
121+
val nestedAgg = s"nested_${identifier.nestedType.getOrElse(aggName)}"
101122
aggPath ++= Seq(nestedAgg)
102-
nestedAggregation(nestedAgg, path) subaggs {
103-
_filtered
104-
}
123+
filtered()
124+
Some(nestedAggregation(nestedAgg, path))
105125
} else {
106-
_filtered
126+
filtered()
127+
None
107128
}
108129

109130
ElasticAggregation(
110131
aggPath.mkString("."),
111132
field,
112133
sourceField,
113134
distinct = distinct,
114-
nested = identifier.nested,
115-
filtered = filter.nonEmpty,
116-
aggType = function,
117-
agg = aggregation
135+
nestedAgg = nestedAgg,
136+
filteredAgg = filteredAgg,
137+
aggType = aggType,
138+
agg = _agg
118139
)
119140
}
141+
142+
/*
143+
def apply(
144+
buckets: Seq[SQLBucket],
145+
aggregations: Seq[Aggregation],
146+
current: Option[TermsAggregation]
147+
): Option[TermsAggregation] = {
148+
buckets match {
149+
case Nil =>
150+
current.map(_.copy(subaggs = aggregations))
151+
case bucket +: tail =>
152+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
153+
current match {
154+
case Some(a) =>
155+
apply(tail, aggregations, Some(agg)) match {
156+
case Some(subAgg) =>
157+
Some(a.copy(subaggs = a.subaggs :+ subAgg))
158+
case _ => Some(a)
159+
}
160+
case None =>
161+
apply(tail, aggregations, Some(agg))
162+
}
163+
}
164+
}
165+
*/
166+
167+
def buildBuckets(
168+
buckets: Seq[SQLBucket],
169+
aggregations: Seq[Aggregation]
170+
): Option[TermsAggregation] = {
171+
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
172+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
173+
current match {
174+
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
175+
case None => Some(agg.copy(subaggs = aggregations))
176+
}
177+
}
178+
}
120179
}

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticSearchRequest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package app.softnetwork.elastic.sql.bridge
22

3-
import app.softnetwork.elastic.sql.{SQLCriteria, SQLExcept, SQLField}
3+
import app.softnetwork.elastic.sql.{SQLBucket, SQLCriteria, SQLExcept, SQLField}
44
import com.sksamuel.elastic4s.searches.SearchRequest
55
import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn
66

@@ -11,6 +11,7 @@ case class ElasticSearchRequest(
1111
criteria: Option[SQLCriteria],
1212
limit: Option[Int],
1313
search: SearchRequest,
14+
buckets: Seq[SQLBucket] = Seq.empty,
1415
aggregations: Seq[ElasticAggregation] = Seq.empty
1516
) {
1617
def minScore(score: Option[Double]): ElasticSearchRequest = {

0 commit comments

Comments
 (0)