Skip to content

Commit 84ecdd3

Browse files
authored
Merge pull request #2 from SOFTNETWORK-APP/feature/akka-grpc
add support for akka grpc - update schedule flags
2 parents ffeb0c0 + acc9df3 commit 84ecdd3

File tree

14 files changed

+288
-9
lines changed

14 files changed

+288
-9
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"
3131

3232
name := "scheduler"
3333

34-
ThisBuild / version := "0.1.0"
34+
ThisBuild / version := "0.1.1"
3535

3636
ThisBuild / scalaVersion := "2.12.15"
3737

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
syntax = "proto3";
2+
3+
import "scalapb/scalapb.proto";
4+
import "model/schedule.proto";
5+
6+
package app.softnetwork.schedule.api;
7+
8+
option (scalapb.options) = {
9+
import: "app.softnetwork.protobuf.ScalaPBTypeMappers._"
10+
preserve_unknown_fields: false
11+
};
12+
13+
service SchedulerServiceApi {
14+
rpc AddSchedule (AddScheduleRequest) returns (AddScheduleResponse) {}
15+
rpc RemoveSchedule (RemoveScheduleRequest) returns (RemoveScheduleResponse) {}
16+
}
17+
18+
message AddScheduleRequest{
19+
org.softnetwork.akka.model.Schedule schedule = 1;
20+
}
21+
22+
message AddScheduleResponse{
23+
bool succeeded = 1;
24+
}
25+
26+
message RemoveScheduleRequest{
27+
string persistenceId = 1;
28+
string entityId = 2;
29+
string key = 3;
30+
}
31+
32+
message RemoveScheduleResponse{
33+
bool succeeded = 1;
34+
}
35+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package app.softnetwork.scheduler.api
2+
3+
import akka.actor.CoordinatedShutdown
4+
import akka.actor.typed.ActorSystem
5+
import akka.grpc.scaladsl.AkkaGrpcClient
6+
7+
import app.softnetwork.persistence.typed._
8+
9+
import scala.concurrent.ExecutionContext
10+
11+
trait GrpcClient {
12+
13+
implicit def system: ActorSystem[_]
14+
implicit lazy val ec: ExecutionContext = system.executionContext
15+
16+
def name: String
17+
18+
def grpcClient: AkkaGrpcClient
19+
}
20+
21+
trait GrpcClientFactory[T <: GrpcClient] {
22+
def name: String
23+
private[this] var client: Option[T] = None
24+
def init(sys: ActorSystem[_]): T
25+
def apply(sys: ActorSystem[_]): T = {
26+
client match {
27+
case Some(value) => value
28+
case _ =>
29+
implicit val classicSystem: _root_.akka.actor.ActorSystem = sys
30+
val shutdown = CoordinatedShutdown(classicSystem)
31+
val cli = init(sys)
32+
client = Some(cli)
33+
shutdown.addTask(
34+
CoordinatedShutdown.PhaseServiceRequestsDone,
35+
s"$name-graceful-terminate"
36+
) { () =>
37+
client = None
38+
cli.grpcClient.close()
39+
}
40+
cli
41+
}
42+
}
43+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package app.softnetwork.scheduler.api
2+
3+
import akka.actor.typed.ActorSystem
4+
import akka.grpc.GrpcClientSettings
5+
import app.softnetwork.schedule.api.{
6+
AddScheduleRequest,
7+
RemoveScheduleRequest,
8+
SchedulerServiceApiClient
9+
}
10+
import org.softnetwork.akka.model.Schedule
11+
12+
import scala.concurrent.Future
13+
14+
trait SchedulerClient extends GrpcClient {
15+
16+
implicit lazy val grpcClient: SchedulerServiceApiClient =
17+
SchedulerServiceApiClient(
18+
GrpcClientSettings.fromConfig(name)
19+
)
20+
21+
def addSchedule(schedule: Schedule): Future[Boolean] = {
22+
grpcClient.addSchedule(AddScheduleRequest(Some(schedule))) map (_.succeeded)
23+
}
24+
25+
def removeSchedule(persistenceId: String, entityId: String, key: String): Future[Boolean] = {
26+
grpcClient.removeSchedule(RemoveScheduleRequest(persistenceId, entityId, key)) map (_.succeeded)
27+
}
28+
29+
}
30+
31+
object SchedulerClient extends GrpcClientFactory[SchedulerClient] {
32+
override val name: String = "SchedulerService"
33+
override def init(sys: ActorSystem[_]): SchedulerClient = {
34+
new SchedulerClient {
35+
override implicit lazy val system: ActorSystem[_] = sys
36+
val name: String = SchedulerClient.name
37+
}
38+
}
39+
}

common/src/main/scala/app/softnetwork/scheduler/model/package.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ package object model {
2929

3030
trait ScheduleDecorator {
3131
_: Schedule =>
32-
/* flag to indicate whether we should trigger this schedule
32+
/* flag to indicate whether the scheduled date has been reached or not
33+
*/
34+
val scheduledDateReached: Boolean =
35+
scheduledDate.isDefined && (now().after(getScheduledDate) || now().equals(getScheduledDate))
36+
/* flag to indicate whether we could trigger this schedule
3337
*/
3438
val triggerable: Boolean =
35-
// the schedule has never been triggered and has no scheduled date
36-
(lastTriggered.isEmpty && scheduledDate.isEmpty) ||
39+
// the schedule has never been triggered or can be triggered repeatedly and has no scheduled date
40+
((lastTriggered.isEmpty || repeatedly.getOrElse(false)) && scheduledDate.isEmpty) ||
3741
// the schedule should be triggered at a specified date that has been reached
3842
// and it has not yet been triggered or has been triggered before the specified date
39-
(scheduledDate.isDefined &&
40-
now().after(getScheduledDate) &&
43+
(scheduledDateReached &&
4144
(lastTriggered.isEmpty || getLastTriggered.before(getScheduledDate)))
4245

4346
/* flag to indicate whether we should remove this schedule
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package app.softnetwork.scheduler.api
2+
3+
import akka.actor.typed.ActorSystem
4+
import app.softnetwork.schedule.api.{
5+
AddScheduleRequest,
6+
AddScheduleResponse,
7+
RemoveScheduleRequest,
8+
RemoveScheduleResponse,
9+
SchedulerServiceApi
10+
}
11+
import app.softnetwork.scheduler.handlers.SchedulerHandler
12+
import app.softnetwork.scheduler.message.{
13+
AddSchedule,
14+
RemoveSchedule,
15+
ScheduleAdded,
16+
ScheduleRemoved
17+
}
18+
19+
import scala.concurrent.{ExecutionContextExecutor, Future}
20+
21+
trait SchedulerServer extends SchedulerServiceApi with SchedulerHandler {
22+
23+
implicit def system: ActorSystem[_]
24+
25+
implicit lazy val ec: ExecutionContextExecutor = system.executionContext
26+
27+
override def addSchedule(in: AddScheduleRequest): Future[AddScheduleResponse] = {
28+
in.schedule match {
29+
case Some(schedule) =>
30+
!?(AddSchedule(schedule)) map {
31+
case _: ScheduleAdded => AddScheduleResponse(true)
32+
case _ => AddScheduleResponse()
33+
}
34+
case _ => Future.successful(AddScheduleResponse())
35+
}
36+
}
37+
38+
override def removeSchedule(in: RemoveScheduleRequest): Future[RemoveScheduleResponse] = {
39+
!?(RemoveSchedule(in.persistenceId, in.entityId, in.key)) map {
40+
case _: ScheduleRemoved => RemoveScheduleResponse(true)
41+
case _ => RemoveScheduleResponse()
42+
}
43+
}
44+
}
45+
46+
object SchedulerServer {
47+
def apply(sys: ActorSystem[_]): SchedulerServer = {
48+
new SchedulerServer {
49+
override implicit val system: ActorSystem[_] = sys
50+
}
51+
}
52+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
11
package app.softnetwork.scheduler.launch
22

3+
import akka.actor.typed.ActorSystem
4+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
35
import app.softnetwork.api.server.launch.HealthCheckApplication
46
import app.softnetwork.persistence.query.SchemaProvider
7+
import app.softnetwork.schedule.api.SchedulerServiceApiHandler
8+
9+
import scala.concurrent.Future
510

611
trait SchedulerApplication extends HealthCheckApplication with SchedulerGuardian {
712
_: SchemaProvider =>
13+
14+
override def grpcServices
15+
: ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system =>
16+
Seq(SchedulerServiceApiHandler.partial(schedulerServer(system))(system))
17+
818
}

core/src/main/scala/app/softnetwork/scheduler/launch/SchedulerGuardian.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import akka.actor.typed.ActorSystem
44
import app.softnetwork.persistence.launch.{PersistenceGuardian, PersistentEntity}
55
import app.softnetwork.persistence.launch.PersistenceGuardian._
66
import app.softnetwork.persistence.query.{EventProcessorStream, SchemaProvider}
7+
import app.softnetwork.scheduler.api.SchedulerServer
78
import app.softnetwork.scheduler.handlers.SchedulerDao
89
import app.softnetwork.scheduler.persistence.query.{
910
Entity2SchedulerProcessorStream,
@@ -48,5 +49,9 @@ trait SchedulerGuardian extends PersistenceGuardian with StrictLogging {
4849
}
4950
}
5051

52+
/** initialize scheduler server
53+
*/
54+
def schedulerServer: ActorSystem[_] => SchedulerServer = sys => SchedulerServer(sys)
55+
5156
override def initSystem: ActorSystem[_] => Unit = initSchedulerSystem
5257
}

testkit/build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ organization := "app.softnetwork.scheduler"
77
name := "scheduler-testkit"
88

99
libraryDependencies ++= Seq(
10-
"app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence,
10+
"app.softnetwork.api" %% "generic-server-api-testkit" % Versions.genericPersistence,
11+
"app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence
1112
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package app.softnetwork.scheduler.api
2+
3+
import akka.http.scaladsl.testkit.PersistenceScalatestGrpcTest
4+
import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit
5+
import app.softnetwork.scheduler.launch.SchedulerGuardian
6+
import org.scalatest.Suite
7+
8+
trait SchedulerGrpcServer
9+
extends PersistenceScalatestGrpcTest
10+
with SchedulerGrpcServices
11+
with InMemoryPersistenceTestKit { _: Suite with SchedulerGuardian =>
12+
override lazy val additionalConfig: String = grpcConfig
13+
}

0 commit comments

Comments
 (0)