Skip to content

Commit e3a07cd

Browse files
authored
Merge pull request #5 from SOFTNETWORK-APP/feature/model
update scheduler behavior and scheduler to entity processor stream
2 parents ee68fe5 + 77806ab commit e3a07cd

File tree

9 files changed

+88
-80
lines changed

9 files changed

+88
-80
lines changed

api/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ organization := "app.softnetwork.scheduler"
2828
name := "scheduler-api"
2929

3030
libraryDependencies ++= Seq(
31-
"app.softnetwork.persistence" %% "persistence-jdbc" % Versions.genericPersistence
31+
"app.softnetwork.persistence" %% "akka-persistence-jdbc" % Versions.genericPersistence
3232
)

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"
3131

3232
name := "scheduler"
3333

34-
ThisBuild / version := "0.2.0"
34+
ThisBuild / version := "0.2.1"
3535

3636
ThisBuild / scalaVersion := "2.12.15"
3737

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import akka.persistence.typed.PersistenceId
55
import app.softnetwork.persistence.message.{Command, CommandResult}
66
import app.softnetwork.persistence.query.{EventProcessorStream, JournalProvider}
77
import app.softnetwork.persistence.typed.scaladsl.EntityPattern
8-
import app.softnetwork.scheduler.handlers.SchedulerDao
98
import app.softnetwork.scheduler.message.SchedulerEvents.{
109
CronTabTriggeredEvent,
1110
CronTabsResetedEvent,
@@ -22,8 +21,6 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
2221
extends EventProcessorStream[SchedulerEvent] {
2322
_: JournalProvider with EntityPattern[C, R] =>
2423

25-
def schedulerDao: SchedulerDao = SchedulerDao
26-
2724
def forTests: Boolean = false
2825

2926
/** Processing event
@@ -49,11 +46,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
4946
if (persistenceId.startsWith(schedule.persistenceId)) {
5047
val entityId = persistenceId.split("\\|").last
5148
if (entityId != ALL_KEY) {
52-
if (forTests) {
53-
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
54-
} else {
55-
schedulerDao.addSchedule(schedule.withEntityId(entityId))
56-
}
49+
triggerSchedule(schedule.withEntityId(entityId))
5750
} else {
5851
Future.successful(true)
5952
}
@@ -80,11 +73,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
8073
if (persistenceId.startsWith(cronTab.persistenceId)) {
8174
val entityId = persistenceId.split("\\|").last
8275
if (entityId != ALL_KEY) {
83-
if (forTests) {
84-
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
85-
} else {
86-
schedulerDao.addSchedule(schedule.withEntityId(entityId))
87-
}
76+
triggerSchedule(schedule.withEntityId(entityId))
8877
} else {
8978
Future.successful(true)
9079
}

core/src/main/scala/app/softnetwork/scheduler/handlers/SchedulerHandler.scala

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import app.softnetwork.persistence.typed.scaladsl.EntityPattern
77
import app.softnetwork.persistence.typed.CommandTypeKey
88
import app.softnetwork.scheduler.message._
99
import app.softnetwork.scheduler.config.{SchedulerConfig, SchedulerSettings}
10-
import app.softnetwork.scheduler.model.{CronTab, Schedule, Scheduler}
10+
import app.softnetwork.scheduler.model.Scheduler
1111
import app.softnetwork.scheduler.persistence.typed.SchedulerBehavior
1212

1313
import scala.concurrent.duration.DurationInt
@@ -60,26 +60,6 @@ trait SchedulerDao extends Completion {
6060
}
6161
}
6262

63-
private[scheduler] def addSchedule(
64-
schedule: Schedule
65-
)(implicit system: ActorSystem[_]): Future[Boolean] = {
66-
implicit val ec: ExecutionContextExecutor = system.executionContext
67-
!?(AddSchedule(schedule)).map {
68-
case _: ScheduleAdded => true
69-
case _ => false
70-
}
71-
}
72-
73-
private[scheduler] def addCronTab(
74-
cronTab: CronTab
75-
)(implicit system: ActorSystem[_]): Future[Boolean] = {
76-
implicit val ec: ExecutionContextExecutor = system.executionContext
77-
!?(AddCronTab(cronTab)).map {
78-
case _: CronTabAdded => true
79-
case _ => false
80-
}
81-
}
82-
8363
def loadScheduler(
8464
schedulerId: Option[String] = None
8565
)(implicit system: ActorSystem[_]): Future[Option[Scheduler]] = {

core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ private[scheduler] trait SchedulerBehavior
7171
)(implicit context: ActorContext[SchedulerCommand]): Effect[SchedulerEvent, Option[Scheduler]] =
7272
command match {
7373
case cmd: ResetCronTabsAndSchedules =>
74-
def trigerResetCronTabsAndSchedules(
75-
scheduler: Scheduler,
74+
def triggerResetCronTabsAndSchedules(
7675
switch: Boolean
7776
): EffectBuilder[SchedulerEvent, Option[Scheduler]] = {
7877
implicit val system: ActorSystem[_] = context.system
@@ -96,18 +95,27 @@ private[scheduler] trait SchedulerBehavior
9695
((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) >
9796
SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000)
9897
) {
98+
// add cron tabs
9999
scheduler.cronTabs.foreach { cronTab =>
100100
context.self ! AddCronTab(cronTab)
101101
}
102-
scheduler.schedules.filter(_.scheduledDate.isDefined).foreach { schedule =>
103-
context.self ! AddSchedule(schedule)
102+
// remove schedules
103+
scheduler.schedules.filter(_.removable).foreach { schedule =>
104+
context.self ! RemoveSchedule(
105+
schedule.persistenceId,
106+
schedule.entityId,
107+
schedule.key
108+
)
109+
}
110+
// trigger schedules
111+
scheduler.schedules.filter(_.triggerable).foreach { schedule =>
112+
triggerSchedule(timers, context, schedule)
104113
}
105114
if (context.log.isInfoEnabled)
106115
context.log.info(
107116
s"${scheduler.cronTabs.size} cron tabs and ${scheduler.schedules.size} schedules reseted"
108117
)
109-
trigerResetCronTabsAndSchedules(
110-
scheduler,
118+
triggerResetCronTabsAndSchedules(
111119
switch = !scheduler.getTriggerResetCronTabsAndSchedules
112120
)
113121
} else {
@@ -117,7 +125,7 @@ private[scheduler] trait SchedulerBehavior
117125
if scheduler.lastCronTabsAndSchedulesReseted.isEmpty ||
118126
((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) >
119127
SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000) =>
120-
trigerResetCronTabsAndSchedules(scheduler, switch = false)
128+
triggerResetCronTabsAndSchedules(switch = false)
121129
case _ => Effect.none.thenRun(_ => CronTabsAndSchedulesNotReseted ~> replyTo)
122130
}
123131
case ResetScheduler => // add all schedules
@@ -135,9 +143,9 @@ private[scheduler] trait SchedulerBehavior
135143
Effect
136144
.persist(events)
137145
.thenRun(_ => {
138-
scheduler.schedules.foreach { schedule =>
139-
context.self ! AddSchedule(schedule)
140-
}
146+
// scheduler.schedules/*.filter(_.scheduledDate.isEmpty)*/.foreach { schedule =>
147+
// context.self ! AddSchedule(schedule)
148+
// }
141149
if (context.log.isInfoEnabled)
142150
context.log.info("Scheduler reseted")
143151
SchedulerReseted ~> replyTo
@@ -186,23 +194,13 @@ private[scheduler] trait SchedulerBehavior
186194
ScheduleNotAdded
187195
}
188196
} else {
197+
if (context.log.isInfoEnabled)
198+
context.log.info(s"$schedule added")
189199
if (updatedSchedule.triggerable) {
190-
if (context.log.isInfoEnabled)
191-
context.log.info(s"Triggering schedule $updatedSchedule")
192-
timers.startSingleTimer(
193-
updatedSchedule.uuid,
194-
TriggerSchedule(
195-
updatedSchedule.persistenceId,
196-
updatedSchedule.entityId,
197-
updatedSchedule.key
198-
),
199-
updatedSchedule.delay.seconds
200-
)
200+
triggerSchedule(timers, context, updatedSchedule)
201201
} else if (context.log.isDebugEnabled) {
202202
context.log.debug(s"Schedule $updatedSchedule has not been triggered")
203203
}
204-
if (context.log.isInfoEnabled)
205-
context.log.info(s"$schedule added")
206204
ScheduleAdded(updatedSchedule)
207205
}) ~> replyTo
208206
})
@@ -419,6 +417,26 @@ private[scheduler] trait SchedulerBehavior
419417
case _ => super.handleCommand(entityId, state, command, replyTo, timers)
420418
}
421419

420+
private def triggerSchedule(
421+
timers: TimerScheduler[SchedulerCommand],
422+
context: ActorContext[SchedulerCommand],
423+
schedule: Schedule
424+
): Unit = {
425+
if (!timers.isTimerActive(schedule.uuid)) {
426+
if (context.log.isInfoEnabled)
427+
context.log.info(s"Triggering schedule $schedule")
428+
timers.startSingleTimer(
429+
schedule.uuid,
430+
TriggerSchedule(
431+
schedule.persistenceId,
432+
schedule.entityId,
433+
schedule.key
434+
),
435+
schedule.delay.seconds
436+
)
437+
}
438+
}
439+
422440
/** @param state
423441
* - current state
424442
* @param event

testkit/src/main/scala/app/softnetwork/scheduler/persistence/typed/SampleBehavior.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import akka.persistence.typed.scaladsl.Effect
66
import app.softnetwork.persistence.message.{Command, CommandResult, Event}
77
import app.softnetwork.persistence.typed._
88
import app.softnetwork.scheduler.config.SchedulerSettings
9-
import app.softnetwork.scheduler.message.RemoveSchedule
9+
import app.softnetwork.scheduler.message.AddSchedule
1010
import app.softnetwork.scheduler.message.SampleMessages.{
1111
AddSample,
1212
LoadSample,
@@ -19,7 +19,7 @@ import app.softnetwork.scheduler.message.SampleMessages.{
1919
TriggerSample
2020
}
2121
import app.softnetwork.scheduler.message.SchedulerEvents.ExternalEntityToSchedulerEvent
22-
import app.softnetwork.scheduler.model.Sample
22+
import app.softnetwork.scheduler.model.{Sample, Schedule}
2323

2424
trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResult] {
2525
override def persistenceId: String = "Sample"
@@ -48,11 +48,14 @@ trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResul
4848
List(
4949
SampleTriggeredEvent(state.map(_.triggered + 1).getOrElse(1)),
5050
ExternalEntityToSchedulerEvent(
51-
ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule(
52-
RemoveSchedule(
53-
persistenceId,
54-
entityId,
55-
cmd.key
51+
ExternalEntityToSchedulerEvent.Wrapped.AddSchedule(
52+
AddSchedule(
53+
Schedule(
54+
persistenceId,
55+
entityId,
56+
cmd.key,
57+
1
58+
)
5659
)
5760
)
5861
)

testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerTestKit.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package app.softnetwork.scheduler.scalatest
22

3+
import akka.actor.testkit.typed.scaladsl.TestProbe
34
import akka.actor.typed.ActorSystem
45
import app.softnetwork.persistence.query.InMemoryJournalProvider
56
import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit
67
import app.softnetwork.scheduler.config.SchedulerSettings
78
import app.softnetwork.scheduler.handlers.SchedulerHandler
89
import app.softnetwork.scheduler.launch.SchedulerGuardian
10+
import app.softnetwork.scheduler.message.{
11+
CronTabAdded,
12+
CronTabRemoved,
13+
ScheduleAdded,
14+
ScheduleRemoved
15+
}
916
import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream
1017
import org.scalatest.Suite
1118

@@ -27,4 +34,16 @@ trait SchedulerTestKit extends SchedulerGuardian with InMemoryPersistenceTestKit
2734
logger.info(tag)
2835
}
2936

37+
val probeScheduleAdded: TestProbe[ScheduleAdded] = createTestProbe[ScheduleAdded]()
38+
subscribeProbe(probeScheduleAdded)
39+
40+
val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]()
41+
subscribeProbe(probeScheduleRemoved)
42+
43+
val probeCronTabAdded: TestProbe[CronTabAdded] = createTestProbe[CronTabAdded]()
44+
subscribeProbe(probeCronTabAdded)
45+
46+
val probeCronTabRemoved: TestProbe[CronTabRemoved] = createTestProbe[CronTabRemoved]()
47+
subscribeProbe(probeCronTabRemoved)
48+
3049
}

testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerWithSampleTestKit.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import app.softnetwork.persistence.launch
66
import app.softnetwork.persistence.launch.PersistenceGuardian._
77
import app.softnetwork.persistence.query.InMemoryJournalProvider
88
import app.softnetwork.scheduler.config.SchedulerSettings
9-
import app.softnetwork.scheduler.message.ScheduleRemoved
109
import app.softnetwork.scheduler.persistence.query.{
1110
SampleScheduleTriggered,
1211
Scheduler2EntityProcessorStream,
@@ -31,7 +30,4 @@ trait SchedulerWithSampleTestKit extends SchedulerTestKit { _: Suite =>
3130
createTestProbe[SampleScheduleTriggered]()
3231
subscribeProbe(probeSampleSchedule)
3332

34-
val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]()
35-
subscribeProbe(probeScheduleRemoved)
36-
3733
}

testkit/src/test/scala/app/softnetwork/scheduler/handlers/SchedulerHandlerSpec.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class SchedulerHandlerSpec
4545
case _: CronTabTriggered => succeed
4646
case other => fail(other.getClass)
4747
}
48-
// a schedule for the Sample[sample] entity has been triggered at the next cron job date
48+
// a schedule for the Sample[sample] entity has been triggered
4949
probeSampleSchedule.receiveMessage()
5050
SampleHandler ? ("sample", LoadSample) assert {
5151
case r: SampleLoaded => assert(r.sample.triggered == 1)
@@ -59,24 +59,27 @@ class SchedulerHandlerSpec
5959
scheduler.schedules.find(s =>
6060
s.persistenceId == SampleBehavior.persistenceId && s.entityId == "sample" && s.key == cronTab.key
6161
) match {
62-
case Some(schedule) =>
63-
assert(!schedule.repeatedly.getOrElse(false))
64-
assert(schedule.getScheduledDate.equals(schedule.getLastTriggered))
65-
assert(schedule.getCronTab == cronTab.uuid)
66-
assert(!schedule.triggerable)
67-
assert(schedule.removable)
68-
case _ => fail("schedule not found")
62+
case None => succeed
63+
case _ => fail("schedule found")
6964
}
7065
case _ => fail()
7166
}
72-
// the schedule for the Sample[sample] entity has been removed
73-
probeScheduleRemoved.receiveMessage()
67+
// a schedule for the Sample[sample] entity has been added
68+
probeScheduleAdded.receiveMessage()
69+
// the schedule for the Sample[sample] entity has been triggered
70+
probeSampleSchedule.receiveMessage()
71+
SampleHandler ? ("sample", LoadSample) assert {
72+
case r: SampleLoaded => assert(r.sample.triggered == 2)
73+
case other => fail(other.getClass)
74+
}
7475
}
7576
"remove Cron Tab" in {
7677
this !? RemoveCronTab(cronTab.persistenceId, cronTab.entityId, cronTab.key) assert {
7778
case _: CronTabRemoved => succeed
7879
case other => fail(other.getClass)
7980
}
81+
// the schedule for the Sample[sample] entity has been removed
82+
probeScheduleRemoved.receiveMessage()
8083
this !? LoadScheduler assert {
8184
case r: SchedulerLoaded =>
8285
val scheduler = r.scheduler

0 commit comments

Comments
 (0)