Skip to content

Commit 5d86db1

Browse files
committed
add scheduler service
1 parent fc5ed93 commit 5d86db1

File tree

13 files changed

+282
-19
lines changed

13 files changed

+282
-19
lines changed
Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
package app.softnetwork.scheduler.api
22

33
import akka.actor.typed.ActorSystem
4-
import app.softnetwork.persistence.jdbc.query.JdbcSchema.SchemaType
4+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
55
import app.softnetwork.persistence.jdbc.query.{JdbcJournalProvider, JdbcSchema, JdbcSchemaProvider}
66
import app.softnetwork.scheduler.handlers.SchedulerHandler
77
import app.softnetwork.scheduler.launch.SchedulerApplication
88
import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream
99

10-
trait SchedulerApi extends SchedulerApplication with JdbcSchemaProvider {
10+
import scala.concurrent.Future
1111

12-
def jdbcSchemaType: SchemaType = this.schemaType
12+
trait SchedulerApi extends SchedulerApplication with JdbcSchemaProvider {
1313

1414
override def entity2SchedulerProcessorStream: ActorSystem[_] => Entity2SchedulerProcessorStream =
1515
sys =>
1616
new Entity2SchedulerProcessorStream()
1717
with SchedulerHandler
1818
with JdbcJournalProvider
1919
with JdbcSchemaProvider {
20-
override lazy val schemaType: JdbcSchema.SchemaType = jdbcSchemaType
20+
override lazy val schemaType: JdbcSchema.SchemaType = SchedulerApi.this.schemaType
2121

2222
override implicit def system: ActorSystem[_] = sys
2323
}
2424

25+
override def grpcServices
26+
: ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system =>
27+
Seq(SchedulerServiceApiHandler.partial(schedulerServer(system))(system))
28+
2529
}

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"
3131

3232
name := "scheduler"
3333

34-
ThisBuild / version := "0.2.4"
34+
ThisBuild / version := "0.2.5"
3535

3636
ThisBuild / scalaVersion := "2.12.15"
3737

common/build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ name := "scheduler-common"
88

99
libraryDependencies ++= Seq(
1010
"com.markatta" %% "akron" % "1.2" excludeAll(ExclusionRule(organization = "com.typesafe.akka"), ExclusionRule(organization = "org.scala-lang.modules")),
11+
// session
12+
"app.softnetwork.session" %% "session-core" % Versions.session,
1113
"app.softnetwork.api" %% "generic-server-api" % Versions.server,
1214
"app.softnetwork.protobuf" %% "scalapb-extensions" % "0.1.5"
1315
)

common/src/main/resources/softnetwork-scheduler.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
softnetwork{
22
scheduler {
3+
path = "scheduler"
34
# id = "*"
45
akka-node-role = scheduler
56
reset-scheduler {

common/src/main/scala/app/softnetwork/scheduler/config/SchedulerSettings.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ object SchedulerSettings extends StrictLogging {
1919
case Right(schedulerConfig) => schedulerConfig
2020
}
2121

22+
val SchedulerPath: String = config.getString("softnetwork.scheduler.path")
23+
2224
def tag(persistenceId: String): String = s"scheduler-to-$persistenceId"
2325
}
2426

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,8 @@
11
package app.softnetwork.scheduler.launch
22

3-
import akka.actor.typed.ActorSystem
4-
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
5-
import app.softnetwork.api.server.launch.HealthCheckApplication
3+
import app.softnetwork.api.server.launch.Application
64
import app.softnetwork.persistence.query.SchemaProvider
7-
import app.softnetwork.scheduler.api.SchedulerServiceApiHandler
85

9-
import scala.concurrent.Future
10-
11-
trait SchedulerApplication extends HealthCheckApplication with SchedulerGuardian {
6+
trait SchedulerApplication extends Application with SchedulerRoutes {
127
_: SchemaProvider =>
13-
14-
override def grpcServices
15-
: ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system =>
16-
Seq(SchedulerServiceApiHandler.partial(schedulerServer(system))(system))
17-
188
}

core/src/main/scala/app/softnetwork/scheduler/launch/SchedulerGuardian.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ import app.softnetwork.scheduler.persistence.query.{
1111
Scheduler2EntityProcessorStream
1212
}
1313
import app.softnetwork.scheduler.persistence.typed.SchedulerBehavior
14+
import app.softnetwork.session.launch.SessionGuardian
1415
import com.typesafe.scalalogging.StrictLogging
1516

1617
import scala.util.{Failure, Success, Try}
1718

18-
trait SchedulerGuardian extends PersistenceGuardian with StrictLogging {
19+
trait SchedulerGuardian extends SessionGuardian with StrictLogging {
1920
_: SchemaProvider =>
2021

2122
def schedulerEntities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = _ =>
@@ -25,7 +26,8 @@ trait SchedulerGuardian extends PersistenceGuardian with StrictLogging {
2526

2627
/** initialize all entities
2728
*/
28-
override def entities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = schedulerEntities
29+
override def entities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = system =>
30+
sessionEntities(system) ++ schedulerEntities(system)
2931

3032
def entity2SchedulerProcessorStream: ActorSystem[_] => Entity2SchedulerProcessorStream
3133

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package app.softnetwork.scheduler.launch
2+
3+
import akka.actor.typed.ActorSystem
4+
import akka.http.scaladsl.server.Route
5+
import app.softnetwork.api.server.ApiRoutes
6+
import app.softnetwork.persistence.query.SchemaProvider
7+
import app.softnetwork.scheduler.service.SchedulerService
8+
9+
trait SchedulerRoutes extends ApiRoutes with SchedulerGuardian { _: SchemaProvider =>
10+
11+
def schedulerService: ActorSystem[_] => SchedulerService = system => SchedulerService(system)
12+
13+
override def apiRoutes(system: ActorSystem[_]): Route = schedulerService(system).route
14+
15+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package app.softnetwork.scheduler.service
2+
3+
import akka.actor.typed.ActorSystem
4+
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
5+
import akka.http.scaladsl.server.{Directives, Route}
6+
import app.softnetwork.api.server._
7+
import app.softnetwork.persistence.service.Service
8+
import app.softnetwork.scheduler.config.SchedulerSettings
9+
import app.softnetwork.scheduler.handlers.{SchedulerDao, SchedulerHandler}
10+
import app.softnetwork.scheduler.message.{
11+
AddCronTab,
12+
AddSchedule,
13+
CronTabAdded,
14+
CronTabRemoved,
15+
RemoveCronTab,
16+
RemoveSchedule,
17+
ScheduleAdded,
18+
ScheduleRemoved,
19+
SchedulerCommand,
20+
SchedulerCommandResult,
21+
SchedulerErrorMessage
22+
}
23+
import app.softnetwork.scheduler.model._
24+
import app.softnetwork.serialization._
25+
import app.softnetwork.session.service.SessionService
26+
import com.softwaremill.session.CsrfDirectives.randomTokenCsrfProtection
27+
import com.softwaremill.session.CsrfOptions.checkHeader
28+
import com.typesafe.scalalogging.StrictLogging
29+
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
30+
import org.json4s.jackson.Serialization
31+
import org.json4s.{jackson, Formats}
32+
import org.softnetwork.session.model.Session
33+
34+
trait SchedulerService
35+
extends SessionService
36+
with Directives
37+
with DefaultComplete
38+
with Json4sSupport
39+
with StrictLogging
40+
with Service[SchedulerCommand, SchedulerCommandResult]
41+
with SchedulerDao
42+
with SchedulerHandler {
43+
44+
implicit def serialization: Serialization.type = jackson.Serialization
45+
46+
override implicit def formats: Formats = commonFormats
47+
48+
import Session._
49+
50+
val route: Route = {
51+
pathPrefix(SchedulerSettings.SchedulerPath) {
52+
// check anti CSRF token
53+
randomTokenCsrfProtection(checkHeader) {
54+
// check if a session exists
55+
_requiredSession(ec) { session =>
56+
// only administrators should be allowed to access this resource
57+
if (session.admin) {
58+
schedules ~ crons ~ scheduler
59+
} else {
60+
complete(HttpResponse(StatusCodes.Unauthorized))
61+
}
62+
}
63+
}
64+
}
65+
}
66+
67+
lazy val schedules: Route = pathPrefix("schedules") {
68+
post {
69+
entity(as[Schedule]) { schedule =>
70+
run(
71+
SchedulerSettings.SchedulerConfig.id.getOrElse("*"),
72+
AddSchedule(schedule)
73+
) completeWith {
74+
case r: ScheduleAdded => complete(HttpResponse(StatusCodes.OK, entity = r))
75+
case r: SchedulerErrorMessage =>
76+
complete(HttpResponse(StatusCodes.BadRequest, entity = r))
77+
case _ => complete(HttpResponse(StatusCodes.BadRequest))
78+
}
79+
}
80+
} ~ delete {
81+
entity(as[RemoveSchedule]) { cmd =>
82+
run(SchedulerSettings.SchedulerConfig.id.getOrElse("*"), cmd) completeWith {
83+
case r: ScheduleRemoved => complete(HttpResponse(StatusCodes.OK, entity = r))
84+
case r: SchedulerErrorMessage =>
85+
complete(HttpResponse(StatusCodes.BadRequest, entity = r))
86+
case _ => complete(HttpResponse(StatusCodes.BadRequest))
87+
}
88+
}
89+
}
90+
}
91+
92+
lazy val crons: Route = pathPrefix("crons") {
93+
post {
94+
entity(as[CronTab]) { cronTab =>
95+
run(SchedulerSettings.SchedulerConfig.id.getOrElse("*"), AddCronTab(cronTab)) completeWith {
96+
case r: CronTabAdded => complete(HttpResponse(StatusCodes.OK, entity = r))
97+
case r: SchedulerErrorMessage =>
98+
complete(HttpResponse(StatusCodes.BadRequest, entity = r))
99+
case _ => complete(HttpResponse(StatusCodes.BadRequest))
100+
}
101+
}
102+
} ~ delete {
103+
entity(as[RemoveCronTab]) { cmd =>
104+
run(SchedulerSettings.SchedulerConfig.id.getOrElse("*"), cmd) completeWith {
105+
case r: CronTabRemoved => complete(HttpResponse(StatusCodes.OK, entity = r))
106+
case r: SchedulerErrorMessage =>
107+
complete(HttpResponse(StatusCodes.BadRequest, entity = r))
108+
case _ => complete(HttpResponse(StatusCodes.BadRequest))
109+
}
110+
}
111+
}
112+
}
113+
114+
lazy val scheduler: Route =
115+
get {
116+
pathSuffix(Segment) { value =>
117+
getScheduler(Some(value))
118+
} ~ pathEnd {
119+
getScheduler(None)
120+
}
121+
}
122+
123+
private[this] def getScheduler(scheduler: Option[String]): Route = {
124+
loadScheduler(scheduler) completeWith {
125+
case Some(s) =>
126+
complete(
127+
HttpResponse(
128+
StatusCodes.OK,
129+
entity = s
130+
)
131+
)
132+
case _ => complete(HttpResponse(StatusCodes.NotFound))
133+
}
134+
}
135+
}
136+
137+
object SchedulerService {
138+
def apply(sys: ActorSystem[_]): SchedulerService = {
139+
new SchedulerService {
140+
override implicit def system: ActorSystem[_] = sys
141+
}
142+
}
143+
}

project/src/main/scala/app/softnetwork/sbt/build/Versions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ object Versions {
88

99
val server = "0.2.8"
1010

11+
val session = "0.1.3"
12+
1113
val scalatest = "3.1.1"
1214
}

0 commit comments

Comments
 (0)