Skip to content

Commit f4488ca

Browse files
authored
Merge pull request #3 from SOFTNETWORK-APP/feature/add-cron-for-all-entities
update triggering cron tab for all entities
2 parents e091547 + 934006d commit f4488ca

File tree

15 files changed

+246
-26
lines changed

15 files changed

+246
-26
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"
3131

3232
name := "scheduler"
3333

34-
ThisBuild / version := "0.1.3"
34+
ThisBuild / version := "0.1.4"
3535

3636
ThisBuild / scalaVersion := "2.12.15"
3737

common/src/main/protobuf/model/schedule.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ message Schedule {
2929
optional bool repeatedly = 5;
3030
optional google.protobuf.Timestamp scheduledDate = 6 [(scalapb.field).type = "java.util.Date"];
3131
optional google.protobuf.Timestamp lastTriggered = 7 [(scalapb.field).type = "java.util.Date"];
32+
optional string cronTab = 8;
3233
}
3334

3435
// CronTab

common/src/main/scala/app/softnetwork/scheduler/model/package.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import java.time.temporal.ChronoUnit
88
import java.util.Date
99
import com.markatta.akron.CronExpression
1010
import com.typesafe.scalalogging.StrictLogging
11-
import org.softnetwork.akka.model.Schedule
11+
import org.softnetwork.akka.model.{CronTab, Schedule}
1212

1313
import scala.concurrent.duration._
14+
import scala.language.implicitConversions
1415
import scala.util.{Failure, Success, Try}
1516

1617
/** Created by smanciot on 11/05/2021.
@@ -91,4 +92,20 @@ package object model {
9192
}
9293
}
9394
}
95+
96+
implicit def cronTabToSchedule(cronTab: CronTab): Option[Schedule] = {
97+
cronTab.nextTriggered match {
98+
case Some(date) =>
99+
Some(
100+
Schedule.defaultInstance
101+
.withPersistenceId(cronTab.persistenceId)
102+
.withEntityId(cronTab.entityId)
103+
.withKey(cronTab.key)
104+
.withScheduledDate(date)
105+
.withRepeatedly(true)
106+
.withCronTab(cronTab.uuid)
107+
)
108+
case _ => None
109+
}
110+
}
94111
}

core/src/main/scala/app/softnetwork/scheduler/persistence/query/package.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package query {
2626

2727
def schedulerDao: SchedulerDao = SchedulerDao
2828

29+
def forTests: Boolean = false
30+
2931
/** Processing event
3032
*
3133
* @param event
@@ -49,7 +51,11 @@ package query {
4951
if (persistenceId.startsWith(schedule.persistenceId)) {
5052
val entityId = persistenceId.split("\\|").last
5153
if (entityId != ALL_KEY) {
52-
schedulerDao.addSchedule(schedule.withEntityId(entityId))
54+
if (forTests) {
55+
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
56+
} else {
57+
schedulerDao.addSchedule(schedule.withEntityId(entityId))
58+
}
5359
} else {
5460
Future.successful(true)
5561
}
@@ -69,18 +75,27 @@ package query {
6975
case evt: CronTabTriggeredEvent =>
7076
import evt._
7177
if (cronTab.entityId == ALL_KEY) {
72-
currentPersistenceIds().runForeach(persistenceId => {
73-
if (persistenceId.startsWith(cronTab.persistenceId)) {
74-
val entityId = persistenceId.split("\\|").last
75-
if (entityId != ALL_KEY) {
76-
schedulerDao.addCronTab(cronTab.withEntityId(entityId))
77-
} else {
78-
Future.successful(true)
79-
}
80-
} else {
81-
Future.successful(true)
82-
}
83-
})
78+
val maybeSchedule: Option[Schedule] = cronTab
79+
maybeSchedule match {
80+
case Some(schedule) =>
81+
currentPersistenceIds().runForeach(persistenceId => {
82+
if (persistenceId.startsWith(cronTab.persistenceId)) {
83+
val entityId = persistenceId.split("\\|").last
84+
if (entityId != ALL_KEY) {
85+
if (forTests) {
86+
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
87+
} else {
88+
schedulerDao.addSchedule(schedule.withEntityId(entityId))
89+
}
90+
} else {
91+
Future.successful(true)
92+
}
93+
} else {
94+
Future.successful(true)
95+
}
96+
})
97+
case _ => Future.successful(Done)
98+
}
8499
} else {
85100
triggerCronTab(cronTab).map {
86101
case true => Done

core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ private[scheduler] trait SchedulerBehavior
2323

2424
lazy val schedulerId: String = SchedulerSettings.SchedulerConfig.id.getOrElse(ALL_KEY)
2525

26-
override val emptyState: Option[Scheduler] = None
27-
2826
override val snapshotInterval: Int = 100
2927

3028
private def schedulerDao: SchedulerDao = SchedulerDao
@@ -379,7 +377,9 @@ private[scheduler] trait SchedulerBehavior
379377
ct.entityId != ALL_KEY && ct.persistenceId == cronTab.persistenceId && ct.key == cronTab.key
380378
)
381379
.map(ct => CronTabRemovedEvent(ct.persistenceId, ct.entityId, ct.key))
382-
.toList
380+
.toList ++ scheduler.schedules
381+
.filter(_.getCronTab == cmd.uuid)
382+
.map(s => ScheduleRemovedEvent(s.persistenceId, s.entityId, s.key))
383383
} else {
384384
List.empty
385385
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package app.softnetwork.scheduler.handlers
2+
3+
import app.softnetwork.persistence.message.{Command, CommandResult}
4+
import app.softnetwork.persistence.typed.scaladsl.EntityPattern
5+
import app.softnetwork.scheduler.persistence.typed.SampleTypedKey
6+
7+
trait SampleHandler extends EntityPattern[Command, CommandResult] with SampleTypedKey {}
8+
9+
object SampleHandler extends SampleHandler
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package app.softnetwork.scheduler.message
2+
3+
import app.softnetwork.persistence.message.{Command, CommandResult, Event}
4+
import app.softnetwork.scheduler.model.Sample
5+
6+
object SampleMessages {
7+
8+
trait SampleCommand extends Command
9+
case object AddSample extends SampleCommand
10+
trait SampleResult extends CommandResult
11+
case object SampleAdded extends SampleResult
12+
trait SampleEvent extends Event
13+
case class SampleAddedEvent(sample: Sample) extends SampleEvent
14+
15+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package app.softnetwork.scheduler.model
2+
3+
import app.softnetwork.persistence.model.State
4+
5+
case class Sample(uuid: String) extends State
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package app.softnetwork.scheduler.persistence.query
2+
3+
import org.softnetwork.akka.model.Schedule
4+
5+
case class SampleScheduleTriggered(schedule: Schedule)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package app.softnetwork.scheduler.persistence.query
2+
3+
import akka.actor.typed.eventstream.EventStream.Publish
4+
import app.softnetwork.persistence.message.{Command, CommandResult}
5+
import app.softnetwork.persistence.query.JournalProvider
6+
import app.softnetwork.scheduler.handlers.SampleHandler
7+
import org.softnetwork.akka.model.Schedule
8+
9+
import scala.concurrent.Future
10+
11+
trait SchedulerToSampleProcessorStream
12+
extends Scheduler2EntityProcessorStream[Command, CommandResult]
13+
with SampleHandler {
14+
_: JournalProvider =>
15+
16+
override def forTests: Boolean = true
17+
18+
/** @param schedule
19+
* - the schedule to trigger
20+
* @return
21+
* true if the schedule has been successfully triggered, false otherwise
22+
*/
23+
override protected def triggerSchedule(schedule: Schedule): Future[Boolean] = {
24+
system.eventStream.tell(Publish(SampleScheduleTriggered(schedule)))
25+
Future.successful(true)
26+
}
27+
}

0 commit comments

Comments
 (0)