Skip to content

Commit e940a7e

Browse files
update readme and examples
1 parent b99a552 commit e940a7e

File tree

2 files changed

+103
-9
lines changed

2 files changed

+103
-9
lines changed

README.md

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,94 @@
1-
# pipes-and-filters
2-
Scala implementation of Pipeline Pattern
1+
# Pipeline Pattern
2+
In software engineering, a pipeline consists of a chain of processing elements (processes, threads, coroutines, functions, etc.), arranged so that the output of each element is the input of the next; the name is by analogy to a physical pipeline.
3+
4+
![alt text](https://github.com/cosminseceleanu/scala-pipeline/blob/master/pipeline-model.png)
5+
6+
<strong>The filter</strong> transforms or filters the data it receives via the pipes with which it is connected. A filter can have any number of input pipes and any number of output pipes.
7+
8+
<strong>The pipe</strong> is the connector that passes data from one filter to the next. It is a directional stream of data, that is usually implemented by a data buffer to store all data, until the next filter has time to process it.
9+
10+
## Introduction
11+
This project allows you to implement the pipeline pattern while creating reusable pipelines in your Scala applications. You can create pipelines consisting of one or more filters and then process them synchronous or asynchronous. Pipeline processing is initiated by some payload(input) and this payload will be passed and transformed from filter to filter in order to complete the required process.
12+
13+
## Usage
14+
In order to create a pipeline you must specify type of initial payload.
15+
```scala
16+
import com.cosmin.pipeline.Pipeline
17+
18+
val pipeline = Pipeline[Int, Int]()
19+
20+
```
21+
22+
Operations in a pipeline i.e. filters can be classes that extends Filter trait or pure functions that receive a input and returns processed input.
23+
In the below example we add Sqrt filter and an anonymus filter who creates a string message with the sqrt value of the input.
24+
25+
```scala
26+
import com.cosmin.pipeline.{Filter, Pipeline}
27+
28+
class Sqrt extends Filter[Int, Double] {
29+
override def execute: Int => Double = input => Math.sqrt(input)
30+
}
31+
32+
val pipeline = Pipeline[Int, Int]() | new Sqrt | (sqrt => s"Sqrt: $sqrt!")
33+
34+
```
35+
After pipeline filters was added we can process them using execute method of pipeline who take as parameter initial payload and a callback(onComplete) as curried parameter to be called when pipeline was processed. Callback receive as input parameter a Try object with the value of the last filter output in case of success.
36+
37+
```scala
38+
pipeline.execute(4) {
39+
case Success(output) => println(output)// print to console: Sqrt: 2.0!
40+
}
41+
```
42+
43+
### Word Count Example
44+
45+
##### Objective: count appearances of a word in text file
46+
```UNIX
47+
cat "myText.txt" | grep "hello" | wc -l
48+
```
49+
50+
##### Define filters
51+
```scala
52+
import com.cosmin.pipeline.{Filter, Pipeline}
53+
import scala.io.Source
54+
55+
class Cat() extends Filter[String, Seq[String]]{
56+
override def execute: String => Seq[String] = file => Source.fromFile(file).getLines.toSeq
57+
}
58+
59+
object Cat {
60+
def apply(): Cat = new Cat()
61+
}
62+
63+
class Grep(word: String) extends Filter[Seq[String], Seq[String]] {
64+
override def execute: Seq[String] => Seq[String] = lines => lines.filter(_.contains(word))
65+
}
66+
67+
object Grep {
68+
def apply(word: String): Grep = new Grep(word)
69+
}
70+
71+
class Count extends Filter[Seq[String], Int] {
72+
override def execute: Seq[String] => Int = lines => lines.count(_ => true)
73+
}
74+
75+
object Count {
76+
def apply(): Count = new Count()
77+
}
78+
```
79+
80+
##### Building the pipeline
81+
```scala
82+
import com.cosmin.pipeline.{Filter, Pipeline}
83+
val pipeline = Pipeline[String, String]() | Cat() | Grep("hello") | Count()
84+
85+
```
86+
87+
##### Execute the pipeline
88+
```scala
89+
pipeline.execute("myText.txt") {
90+
case Success(output) => println(s"word 'hello' was found on $output lines")
91+
}
92+
```
93+
Code above print to console 'word 'hello' was found on 3 lines' for the file https://github.com/cosminseceleanu/scala-pipeline/blob/master/myText.txt
94+
Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package com.cosmin.examples
22

3-
import com.cosmin.pipeline.Pipeline
3+
import com.cosmin.pipeline.{Filter, Pipeline}
44

5-
import scala.util.{Random, Success}
5+
import scala.util.Success
66

77
/**
88
* Pipeline who receive a random int --> calculate sqrt --> create a message with sqrt result
99
* Int -> Double -> String
1010
*/
1111
object TypeConversions {
1212
def main(args: Array[String]): Unit = {
13-
val pipeline = Pipeline[Int, Int]() | (nr => Math.sqrt(nr)) | (sqrt => s"Sqrt: $sqrt!")
14-
val random = new Random()
15-
val input = Math.abs(random.nextInt())
13+
val pipeline = Pipeline[Int, Int]() | new Sqrt | (sqrt => s"Sqrt: $sqrt!")
1614

17-
pipeline.execute(input) {
18-
case Success(out) => println(s"input: $input --> $out")
15+
pipeline.execute(4) {
16+
case Success(output) => println(output)
1917
}
2018
}
2119
}
20+
21+
class Sqrt extends Filter[Int, Double] {
22+
override def execute: Int => Double = input => Math.sqrt(input)
23+
}

0 commit comments

Comments
 (0)