Skip to content

Commit c8c9731

Browse files
committed
fix dataset for jdbc state provider - add configuration entries for jdbc and json external providers
1 parent 4f5042f commit c8c9731

File tree

6 files changed

+88
-39
lines changed

6 files changed

+88
-39
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ThisBuild / organization := "app.softnetwork"
88

99
name := "generic-persistence-api"
1010

11-
ThisBuild / version := "0.7.0"
11+
ThisBuild / version := "0.7.1"
1212

1313
ThisBuild / scalaVersion := "2.12.18"
1414

core/src/main/resources/softnetwork-persistence.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,11 @@ clustering {
275275
cluster.name = Softnetwork
276276
cluster.name = ${?CLUSTER_NAME}
277277
}
278+
279+
json-external-processor{
280+
output-folder = "/tmp"
281+
output-folder = ${?JSON_EXTERNAL_PROCESSOR_OUTPUT_FOLDER}
282+
283+
file-extension = "json"
284+
file-extension = ${?JSON_EXTERNAL_PROCESSOR_FILE_EXTENSION}
285+
}

core/src/main/scala/app/softnetwork/persistence/query/JsonProvider.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app.softnetwork.persistence.query
33
import app.softnetwork.persistence.ManifestWrapper
44
import app.softnetwork.persistence.model.{CamelCaseString, StateWrapper, StateWrappertReader, Timestamped}
55
import app.softnetwork.serialization.{commonFormats, serialization, updateCaseClass}
6+
import com.typesafe.config.{Config, ConfigFactory}
67
import org.json4s.Formats
78

89
import java.nio.charset.StandardCharsets
@@ -25,9 +26,23 @@ trait JsonProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] {
2526

2627
lazy val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern(FORMAT).withZone(zoneId)
2728

28-
def outputFolder: String = "/tmp"
29+
def outputFolder: String = {
30+
val config = ConfigFactory.load()
31+
if (config.hasPath("json-external-processor.output-folder")) {
32+
config.getString("json-external-processor.output-folder")
33+
} else {
34+
System.getProperty("java.io.tmpdir")
35+
}
36+
}
2937

30-
def fileExtension: String = "dat"
38+
def fileExtension: String = {
39+
val config = ConfigFactory.load()
40+
if (config.hasPath("json-external-processor.file-extension")) {
41+
config.getString("json-external-processor.file-extension")
42+
} else {
43+
"json"
44+
}
45+
}
3146

3247
implicit lazy val ct: ClassTag[T] = manifestWrapper.wrapped
3348

@@ -196,6 +211,9 @@ trait JsonProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] {
196211
serialization.write(map)
197212
case _ => document.asJson
198213
}
214+
if (!Files.exists(Paths.get(outputFolder))) {
215+
Files.createDirectories(Paths.get(outputFolder))
216+
}
199217
if (!Files.exists(Paths.get(filePath))) {
200218
Files.createFile(Paths.get(filePath))
201219
}

jdbc/src/main/resources/softnetwork-jdbc-persistence.conf

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,6 @@ jdbc-durable-state-store {
5353
}
5454

5555
jdbc-event-processor-offsets {
56-
# db {
57-
# url = "jdbc:postgresql://"${db.ip}":"${db.port}"/"${db.name}"?reWriteBatchedInserts=true"
58-
# user = ${credentials.db.username}
59-
# password = ${credentials.db.password}
60-
# driver = "org.postgresql.Driver"
61-
# numThreads = ${db.max-connections}
62-
# minConnections = 10
63-
# maxConnections = 50
64-
# idleTimeout = 10000 //10 seconds
65-
# }
6656
schema = ${credentials.db.username}
6757
table = "event_processor_offsets"
6858
}
@@ -103,3 +93,6 @@ credentials{
10393
}
10494
}
10595

96+
jdbc-external-processor {
97+
dataset = ${?JDBC_EXTERNAL_PROCESSOR_DATASET}
98+
}

jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/JdbcStateProvider.scala

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ import app.softnetwork.persistence.jdbc.db.SlickDatabase
66
import app.softnetwork.persistence.model.{StateWrapper, Timestamped}
77
import app.softnetwork.persistence.query.JsonProvider
88
import app.softnetwork.serialization.{serialization, updateCaseClass}
9-
import com.typesafe.scalalogging.Logger
109
import org.json4s.Formats
11-
import org.slf4j.LoggerFactory
1210
import slick.jdbc.JdbcProfile
1311

1412
import java.time.Instant
@@ -22,14 +20,22 @@ trait JdbcStateProvider[T <: Timestamped]
2220
with Completion {
2321
_: ManifestWrapper[T] with JdbcProfile =>
2422

25-
protected lazy val alogger: Logger =
26-
Logger(LoggerFactory.getLogger(getClass.getName))
27-
28-
val dataset: Option[String] = None
23+
def dataset: Option[String] = {
24+
if (config.hasPath("jdbc-external-processor.dataset")) {
25+
val d = config.getString("jdbc-external-processor.dataset")
26+
if (d.nonEmpty) {
27+
Some(d)
28+
} else {
29+
None
30+
}
31+
} else {
32+
None
33+
}
34+
}
2935

3036
lazy val table: String = getType[T](manifestWrapper.wrapped)
3137

32-
lazy val tableFullName: String = dataset match {
38+
private[this] lazy val tableFullName: String = dataset match {
3339
case Some(s) => s"$s.$table"
3440
case _ => table
3541
}
@@ -198,7 +204,7 @@ trait JdbcStateProvider[T <: Timestamped]
198204
(if (!to_update) {
199205
insert(document.uuid, document.lastUpdated, document.deleted, state)
200206
} else {
201-
alogger.debug(s"Updating document ${document.uuid} with data $state")
207+
log.debug(s"Updating document ${document.uuid} with data $state")
202208
update(document.uuid, document.lastUpdated, document.deleted, state)
203209
}) && writeToFile(
204210
document,
@@ -229,10 +235,10 @@ trait JdbcStateProvider[T <: Timestamped]
229235
(states += (uuid, lastUpdated, deleted, state)).map(_ > 0)
230236
db.run(action) complete () match {
231237
case Success(value) =>
232-
alogger.debug(s"Insert to $tableFullName with $uuid -> $value")
238+
log.debug(s"Insert to $tableFullName with $uuid -> $value")
233239
value
234240
case Failure(f) =>
235-
alogger.error(f.getMessage, f)
241+
log.error(f.getMessage, f)
236242
false
237243
}
238244
}
@@ -265,13 +271,13 @@ trait JdbcStateProvider[T <: Timestamped]
265271
db.run(action) complete () match {
266272
case Success(value) =>
267273
if (deleted) {
268-
alogger.debug(s"Delete from $tableFullName with $uuid -> $value")
274+
log.debug(s"Delete from $tableFullName with $uuid -> $value")
269275
} else {
270-
alogger.debug(s"Update to $tableFullName with $uuid -> $value")
276+
log.debug(s"Update to $tableFullName with $uuid -> $value")
271277
}
272278
value
273279
case Failure(f) =>
274-
alogger.error(f.getMessage, f)
280+
log.error(f.getMessage, f)
275281
false
276282
}
277283
}
@@ -287,10 +293,10 @@ trait JdbcStateProvider[T <: Timestamped]
287293
val action = states.filter(_.uuid === uuid).delete.map(_ > 0)
288294
db.run(action) complete () match {
289295
case Success(value) =>
290-
alogger.debug(s"Delete from $tableFullName with $uuid -> $value")
296+
log.debug(s"Delete from $tableFullName with $uuid -> $value")
291297
value
292298
case Failure(f) =>
293-
alogger.error(f.getMessage, f)
299+
log.error(f.getMessage, f)
294300
false
295301
}
296302
}
@@ -312,18 +318,18 @@ trait JdbcStateProvider[T <: Timestamped]
312318
//logger.info(s"$document")
313319
document match {
314320
case (_, _, deleted, _) if deleted =>
315-
alogger.debug(s"Load $tableFullName with $uuid -> None")
321+
log.debug(s"Load $tableFullName with $uuid -> None")
316322
None
317323
case (_, _, _, state) =>
318-
alogger.debug(s"Load $tableFullName with $uuid -> $value")
319-
Option(readState(state))
324+
log.debug(s"Load $tableFullName with $uuid -> $value")
325+
Option(readState(state)(manifest))
320326
}
321327
case _ =>
322-
alogger.debug(s"Load $tableFullName with $uuid -> None")
328+
log.debug(s"Load $tableFullName with $uuid -> None")
323329
None
324330
}
325331
case Failure(f) =>
326-
alogger.error(f.getMessage, f)
332+
log.error(f.getMessage, f)
327333
None
328334
}
329335
}
@@ -347,15 +353,15 @@ trait JdbcStateProvider[T <: Timestamped]
347353
}
348354
db.run(action) complete () match {
349355
case Success(value) =>
350-
alogger.debug(s"Search $tableFullName with $query -> $value")
356+
log.debug(s"Search $tableFullName with $query -> $value")
351357
value.map(readState)
352358
case Failure(f) =>
353-
alogger.error(f.getMessage, f)
359+
log.error(f.getMessage, f)
354360
Nil
355361
}
356362
}
357363

358-
class States(tag: Tag) extends Table[(String, Instant, Boolean, String)](tag, tableFullName) {
364+
class States(tag: Tag) extends Table[(String, Instant, Boolean, String)](tag, dataset, table) {
359365
def uuid = column[String]("uuid", O.PrimaryKey)
360366
def lastUpdated = column[Instant]("last_updated")
361367
def deleted = column[Boolean]("deleted")
@@ -375,14 +381,36 @@ trait JdbcStateProvider[T <: Timestamped]
375381
}
376382

377383
def initTable(): Unit = {
378-
alogger.info(
384+
initDataset()
385+
log.info(
379386
s"Setting up table $tableFullName ${states.schema.createStatements.mkString(";\n")}"
380387
)
381388
db.run(DBIO.seq(states.schema.createIfNotExists)).complete() match {
382389
case Success(_) =>
383-
alogger.debug(s"Setup table $tableFullName")
390+
log.debug(s"Setup table $tableFullName")
384391
case Failure(f) =>
385-
alogger.error(f.getMessage, f)
392+
log.error(f.getMessage, f)
393+
}
394+
}
395+
396+
def initDataset(): Unit = {
397+
dataset match {
398+
case Some(d) =>
399+
log.info(
400+
s"Setting up dataset $d"
401+
)
402+
val ddl = s"CREATE SCHEMA IF NOT EXISTS $d"
403+
withStatement { stmt =>
404+
try {
405+
stmt.executeUpdate(ddl)
406+
log.debug(s"Setup dataset $d")
407+
} catch {
408+
case t: java.sql.SQLSyntaxErrorException
409+
if t.getMessage contains "ORA-00942" => // suppress known error message in the test
410+
case other: Throwable => log.error(other.getMessage)
411+
}
412+
}
413+
case _ =>
386414
}
387415
}
388416
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
json-external-processor.output-folder = "/tmp/json"
2+
jdbc-external-processor.dataset = "test"

0 commit comments

Comments
 (0)