Skip to content

Commit d846ed8

Browse files
add akka executor
1 parent 1595c75 commit d846ed8

File tree

6 files changed

+111
-2
lines changed

6 files changed

+111
-2
lines changed

build.sbt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,8 @@ name := "PipesAndFilters"
33
version := "0.1"
44

55
scalaVersion := "2.12.4"
6-
6+
7+
libraryDependencies ++= Seq(
8+
"com.typesafe.akka" %% "akka-actor" % "2.5.11",
9+
"com.typesafe.akka" %% "akka-testkit" % "2.5.11" % Test
10+
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.cosmin.examples
2+
3+
import com.cosmin.pipeline.executor.AkkaExecutor
4+
5+
import scala.util.Success
6+
7+
object AkkaPipeline {
8+
def main(args: Array[String]): Unit = {
9+
val exec = AkkaExecutor[Int, Int]
10+
// exec.execute(1, List()) {
11+
// case Success(nr) => println("aaaa")
12+
// }
13+
14+
val l = List(1, 2, 3)
15+
println(l.head)
16+
println(l.tail)
17+
}
18+
}

src/main/scala/com/cosmin/examples/wordcount/WordCount.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.cosmin.examples.wordcount
22

33
import com.cosmin.pipeline.Pipeline
4-
import com.cosmin.pipeline.executor.AsyncExecutor
4+
import com.cosmin.pipeline.executor.{AkkaExecutor, AsyncExecutor}
55

66
import scala.util.Success
77

@@ -18,6 +18,7 @@ object WordCount {
1818
}
1919

2020
executeAsync(pipeline, wordToFind)
21+
executeAsyncUsingAkka(pipeline, wordToFind)
2122
}
2223

2324
private def executeAsync(pipeline: Pipeline[String, Int], wordToFind: String): Unit = {
@@ -27,4 +28,11 @@ object WordCount {
2728
}
2829
Thread.sleep(2000)
2930
}
31+
32+
private def executeAsyncUsingAkka(pipeline: Pipeline[String, Int], wordToFind: String): Unit = {
33+
implicit val akkaExecutor: AkkaExecutor[String, Int] = AkkaExecutor[String, Int]
34+
pipeline.execute("myText.txt") {
35+
case Success(output) => println(s"Akka Async ---> word '$wordToFind' was found on $output lines")
36+
}
37+
}
3038
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.cosmin.pipeline.executor
2+
import akka.actor.ActorSystem
3+
import com.cosmin.pipeline.Stage
4+
import com.cosmin.pipeline.executor.actor.Supervisor
5+
import com.cosmin.pipeline.executor.actor.Supervisor.Start
6+
7+
import scala.util.Try
8+
9+
class AkkaExecutor[In, Out] extends PipelineExecutor[In, Out] {
10+
override def execute(in: In, stages: List[Stage])(onComplete: Try[Out] => Unit): Unit = {
11+
val system = ActorSystem("pipeline")
12+
val supervisor = system.actorOf(Supervisor.props[Out](stages, onComplete), "pipeline-supervisor")
13+
14+
supervisor ! Start[In](in)
15+
}
16+
}
17+
18+
object AkkaExecutor {
19+
def apply[In, Out]: AkkaExecutor[In, Out] = new AkkaExecutor[In, Out]
20+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.cosmin.pipeline.executor.actor
2+
3+
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
4+
import com.cosmin.pipeline.Stage
5+
import com.cosmin.pipeline.executor.actor.Supervisor.{StageCompleted, Start}
6+
import com.cosmin.pipeline.executor.actor.Worker.Execute
7+
8+
import scala.util.Try
9+
10+
object Supervisor {
11+
def props[Out](stages: List[Stage], onComplete: Try[Out] => Unit): Props = Props(new Supervisor(stages, onComplete))
12+
13+
final case class Start[In](in: In)
14+
final case class StageCompleted[Out](stage: Stage, result: Out)
15+
}
16+
17+
class Supervisor[Out](stages: List[Stage], onComplete: Try[Out] => Unit) extends Actor with ActorLogging {
18+
private var remainingStages = stages
19+
private var stageToActor: Map[Stage, ActorRef] = Map.empty
20+
21+
override def preStart(): Unit = stageToActor = stages.map(stage => (stage, context.actorOf(Worker.props(stage)))).toMap
22+
23+
override def receive: Receive = {
24+
case Start(in) => executeStage(in)
25+
case StageCompleted(stage, result) =>
26+
if (remainingStages.isEmpty) {
27+
onComplete(Try(result.asInstanceOf[Out]))
28+
context.stop(self)
29+
} else {
30+
executeStage(result)
31+
}
32+
}
33+
34+
private def executeStage(in: Any): Unit = {
35+
val stage = remainingStages.head
36+
stageToActor(stage) ! Execute[stage.In](in.asInstanceOf[stage.In])
37+
remainingStages = remainingStages.tail
38+
}
39+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.cosmin.pipeline.executor.actor
2+
3+
import akka.actor.{Actor, Props}
4+
import com.cosmin.pipeline.Stage
5+
import com.cosmin.pipeline.executor.actor.Supervisor.StageCompleted
6+
import com.cosmin.pipeline.executor.actor.Worker.Execute
7+
8+
object Worker {
9+
def props(stage: Stage): Props = Props(new Worker(stage))
10+
11+
final case class Execute[In](in: In)
12+
}
13+
14+
class Worker(stage: Stage) extends Actor {
15+
override def receive: Receive = {
16+
case Execute(input) =>
17+
val result = stage.execute(input.asInstanceOf[stage.In])
18+
sender() ! StageCompleted[stage.Out](stage, result.asInstanceOf[stage.Out])
19+
}
20+
}

0 commit comments

Comments
 (0)