Skip to content

Commit 834bcdf

Browse files
committed
update triggering cron tab for all entities
1 parent e091547 commit 834bcdf

File tree

14 files changed

+223
-25
lines changed

14 files changed

+223
-25
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"
3131

3232
name := "scheduler"
3333

34-
ThisBuild / version := "0.1.3"
34+
ThisBuild / version := "0.1.4"
3535

3636
ThisBuild / scalaVersion := "2.12.15"
3737

common/src/main/scala/app/softnetwork/scheduler/model/package.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import java.time.temporal.ChronoUnit
88
import java.util.Date
99
import com.markatta.akron.CronExpression
1010
import com.typesafe.scalalogging.StrictLogging
11-
import org.softnetwork.akka.model.Schedule
11+
import org.softnetwork.akka.model.{CronTab, Schedule}
1212

1313
import scala.concurrent.duration._
14+
import scala.language.implicitConversions
1415
import scala.util.{Failure, Success, Try}
1516

1617
/** Created by smanciot on 11/05/2021.
@@ -91,4 +92,19 @@ package object model {
9192
}
9293
}
9394
}
95+
96+
implicit def cronTabToSchedule(cronTab: CronTab): Option[Schedule] = {
97+
cronTab.nextTriggered match {
98+
case Some(date) =>
99+
Some(
100+
Schedule.defaultInstance
101+
.withPersistenceId(cronTab.persistenceId)
102+
.withEntityId(cronTab.entityId)
103+
.withKey(cronTab.key)
104+
.withScheduledDate(date)
105+
.withRepeatedly(true)
106+
)
107+
case _ => None
108+
}
109+
}
94110
}

core/src/main/scala/app/softnetwork/scheduler/persistence/query/package.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package query {
2626

2727
def schedulerDao: SchedulerDao = SchedulerDao
2828

29+
def forTests: Boolean = false
30+
2931
/** Processing event
3032
*
3133
* @param event
@@ -49,7 +51,11 @@ package query {
4951
if (persistenceId.startsWith(schedule.persistenceId)) {
5052
val entityId = persistenceId.split("\\|").last
5153
if (entityId != ALL_KEY) {
52-
schedulerDao.addSchedule(schedule.withEntityId(entityId))
54+
if (forTests) {
55+
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
56+
} else {
57+
schedulerDao.addSchedule(schedule.withEntityId(entityId))
58+
}
5359
} else {
5460
Future.successful(true)
5561
}
@@ -69,18 +75,27 @@ package query {
6975
case evt: CronTabTriggeredEvent =>
7076
import evt._
7177
if (cronTab.entityId == ALL_KEY) {
72-
currentPersistenceIds().runForeach(persistenceId => {
73-
if (persistenceId.startsWith(cronTab.persistenceId)) {
74-
val entityId = persistenceId.split("\\|").last
75-
if (entityId != ALL_KEY) {
76-
schedulerDao.addCronTab(cronTab.withEntityId(entityId))
77-
} else {
78-
Future.successful(true)
79-
}
80-
} else {
81-
Future.successful(true)
82-
}
83-
})
78+
val maybeSchedule: Option[Schedule] = cronTab
79+
maybeSchedule match {
80+
case Some(schedule) =>
81+
currentPersistenceIds().runForeach(persistenceId => {
82+
if (persistenceId.startsWith(cronTab.persistenceId)) {
83+
val entityId = persistenceId.split("\\|").last
84+
if (entityId != ALL_KEY) {
85+
if (forTests) {
86+
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
87+
} else {
88+
schedulerDao.addSchedule(schedule.withEntityId(entityId))
89+
}
90+
} else {
91+
Future.successful(true)
92+
}
93+
} else {
94+
Future.successful(true)
95+
}
96+
})
97+
case _ => Future.successful(Done)
98+
}
8499
} else {
85100
triggerCronTab(cronTab).map {
86101
case true => Done

core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ private[scheduler] trait SchedulerBehavior
2323

2424
lazy val schedulerId: String = SchedulerSettings.SchedulerConfig.id.getOrElse(ALL_KEY)
2525

26-
override val emptyState: Option[Scheduler] = None
27-
2826
override val snapshotInterval: Int = 100
2927

3028
private def schedulerDao: SchedulerDao = SchedulerDao
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package app.softnetwork.scheduler.handlers
2+
3+
import app.softnetwork.persistence.message.{Command, CommandResult}
4+
import app.softnetwork.persistence.typed.scaladsl.EntityPattern
5+
import app.softnetwork.scheduler.persistence.typed.SampleTypedKey
6+
7+
trait SampleHandler extends EntityPattern[Command, CommandResult] with SampleTypedKey {}
8+
9+
object SampleHandler extends SampleHandler
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package app.softnetwork.scheduler.message
2+
3+
import app.softnetwork.persistence.message.{Command, CommandResult, Event}
4+
import app.softnetwork.scheduler.model.Sample
5+
6+
object SampleMessages {
7+
8+
trait SampleCommand extends Command
9+
case object AddSample extends SampleCommand
10+
trait SampleResult extends CommandResult
11+
case object SampleAdded extends SampleResult
12+
trait SampleEvent extends Event
13+
case class SampleAddedEvent(sample: Sample) extends SampleEvent
14+
15+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package app.softnetwork.scheduler.model
2+
3+
import app.softnetwork.persistence.model.State
4+
5+
case class Sample(uuid: String) extends State
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package app.softnetwork.scheduler.persistence.query
2+
3+
import org.softnetwork.akka.model.Schedule
4+
5+
case class SampleScheduleTriggered(schedule: Schedule)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package app.softnetwork.scheduler.persistence.query
2+
3+
import akka.actor.typed.eventstream.EventStream.Publish
4+
import app.softnetwork.persistence.message.{Command, CommandResult}
5+
import app.softnetwork.persistence.query.JournalProvider
6+
import app.softnetwork.scheduler.handlers.SampleHandler
7+
import org.softnetwork.akka.model.Schedule
8+
9+
import scala.concurrent.Future
10+
11+
trait SchedulerToSampleProcessorStream
12+
extends Scheduler2EntityProcessorStream[Command, CommandResult]
13+
with SampleHandler {
14+
_: JournalProvider =>
15+
16+
override def forTests: Boolean = true
17+
18+
/** @param schedule
19+
* - the schedule to trigger
20+
* @return
21+
* true if the schedule has been successfully triggered, false otherwise
22+
*/
23+
override protected def triggerSchedule(schedule: Schedule): Future[Boolean] = {
24+
system.eventStream.tell(Publish(SampleScheduleTriggered(schedule)))
25+
Future.successful(true)
26+
}
27+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package app.softnetwork.scheduler.persistence.typed
2+
3+
import akka.actor.typed.ActorRef
4+
import akka.actor.typed.scaladsl.{ActorContext, TimerScheduler}
5+
import akka.persistence.typed.scaladsl.Effect
6+
import app.softnetwork.persistence.message.{Command, CommandResult, Event}
7+
import app.softnetwork.persistence.typed._
8+
import app.softnetwork.scheduler.message.SampleMessages.{AddSample, SampleAdded, SampleAddedEvent}
9+
import app.softnetwork.scheduler.model.Sample
10+
11+
trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResult] {
12+
override def persistenceId: String = "Sample"
13+
14+
override def handleCommand(
15+
entityId: String,
16+
state: Option[Sample],
17+
command: Command,
18+
replyTo: Option[ActorRef[CommandResult]],
19+
timers: TimerScheduler[Command]
20+
)(implicit context: ActorContext[Command]): Effect[Event, Option[Sample]] = {
21+
command match {
22+
case AddSample =>
23+
Effect.persist(SampleAddedEvent(Sample(entityId))).thenRun(_ => SampleAdded ~> replyTo)
24+
case _ => super.handleCommand(entityId, state, command, replyTo, timers)
25+
}
26+
}
27+
28+
override def handleEvent(state: Option[Sample], event: Event)(implicit
29+
context: ActorContext[_]
30+
): Option[Sample] = {
31+
event match {
32+
case evt: SampleAddedEvent =>
33+
Some(evt.sample)
34+
case _ => super.handleEvent(state, event)
35+
}
36+
}
37+
}
38+
39+
object SampleBehavior extends SampleBehavior

0 commit comments

Comments
 (0)