Advanced stream processing

Parallelism, nondeterminism, and distributed programming

Paul Chiusano, @pchiusano,

Disclaimer / shameless plug

... for a kindler, gentler introduction see chapter 15 of FP in Scala:

Book half off today, discount code: scalawkd5

Process1: a 1-input stream processor

sealed trait Process1[I,O]
case class Halt[I,O]() extends Process1[I,O]
case class Emit[I,O](
    head: Seq[O], 
    tail: Process1[I,O] = Halt[I,O]()) 
  extends Process1[I,O]
case class Await[I,O](
    recv: I => Process1[I,O], 
    fallback: Process1[I,O] = Halt[I,O]()) 
  extends Process1[I,O]

Process1 definitions

def sum: Process1[Double,Double] = {
  def go(acc: Double): Process1[Double,Double] = 
    Await((d: Double) => Emit(Seq(d+acc), go(d+acc))) 
scala> sum(Stream(1.0, 2.0, 3.0, 4.0)).toList
val res0: List(1.0, 3.0, 6.0, 10.0)

Driving a Process1[I,O]

def apply(s: Stream[I]): Stream[O] = this match {
  case Halt() => Stream()
  case Await(recv, fallback) => s match {
    case h #:: t => recv(h)(t) 
    case _ => fallback(s) // Stream is empty
  case Emit(h,t) => h.toStream append t(s)

Process1[I,O]: the List[O] view

def ++(p: => Process1[I,O]): Process1[I,O]
def flatMap[O2](f: O => Process1[I,O2]): Process1[I,O2]
def map[O2](f: O => O2): Process1[I,O2]
def repeat: Process1[I,O]
def filter[I](f: I => Boolean): Process1[I,I]
def take[I](n: Int): Process1[I,I]

Process1[I,O]: the I => O view

def lift[I,O](f: I => O): Process1[I,O] = 
  Await((i: I) => Emit(Seq(f(i)))) repeat
def |>[O2](p2: Process1[O,O2]): Process1[I,O2]

Process1 example

val lineCounter: Process1[String,Int] = 
  filter(!_.startsWith("//")) |> 
  filter(!_.trim.isEmpty) |> 
  lift(_ => 1.0) |> 
  sum |> 
  lift(_ toInt)
val prog: ??? = lines("Main.scala") |> lineCounter

Generalizing Process1


case class Await[I,O](
    recv: I => Process1[I,O], 
    finalizer: Process1[I,O] = Halt[I,O]()) 
  extends Process1[I,O]

Abstracting over the context

case class Await[F[_],A,O](
  req: F[A], recv: A => Process[F,O],
  fallback: Process[F,O],
  onError: Process[F,O]) extends Process[F,O]

The full Process type

trait Process[F[_],O]

case class Await[F[_],A,O](
  req: F[A], recv: A => Process[F,O],
  fallback: Process[F,O],
  onError: rocess[F,O]) extends Process[F,O]

case class Emit[F[_],O](
  head: Seq[O], 
  tail: Process[F,O]) extends Process[F,O]

case class Halt[F[_],O]() extends Process[F,O]

The List-like operations work the same!

def ++(p: => Process[F,O]): Process[F,O]
def flatMap[O2](f: O => Process[F,O2]): Process[F,O2]
def map[O2](f: O => O2): Process[F,O2]
def repeat: Process[F,O2]

Process1 is a Process

case class Is[I]() {
  sealed trait f[X]
  case object Get extends f[I]
def Get[I] = Is[I]().Get

type Process1[I,O] = Process[Is[I]#f, O]
      (i: Int) => Emit(Seq(i+1))).repeat

Process1 is a Process

trait Process[F[_],O] {
  def |>[O2](p2: Process1[O,O2]): Process[F,O2]

Tee: a two-input Process

case class T[I,I2]() {
  sealed trait f[X] 
  case object L extends f[I]
  case object R extends f[I2]
type Tee[I,I2,O] = Process[T[I,I2]#f, O]

Tee: a two-input Process

def zipWith[O,O2,O3](f: (O,O2) => O3): Tee[O,O2,O3] = 
  awaitL[O,O2,O3](o => 
  awaitR         (o2 => emitT(f(o,o2)))).repeat
trait Process[F[_],O] {
  def tee[O2,O3](p2: Process[F,O2])(
                 t: Tee[O,O2,O3]): Process[F,O3]

Effectful sources

type Source[O] = Process[IO,O]
case class Await[A,O](
  req: IO[A],       // or Future[A] 
  recv: A => Process[O],

Effectful sources

trait Partial[F[_]] { 
  def attempt[A](a: F[A]): F[Either[Throwable,A]]
  def fail[A](t: Throwable): F[A]

// indicates normal termination
case object End extends Exception

implicit class Source[F[_]:Monad:Partial, O](
    src: Process[F,O]) {
  def collect: F[IndexedSeq[O]] = ...  

Examples of effectful sources

def lines(filename: String): Process[IO, String]

def rows(query: PreparedStatement): Process[IO, Row]

def resource[R,O](
  acquire: IO[R])(
  release: R => IO[Unit],
  step: R => IO[Either[Throwable,O]]): Process[IO,O]

Effectful sinks and channels

type Sink[F[_],O] = Process[F,O => F[Unit]]
val fileIn: Process[IO,String] = ... 
val fileOut: Process[IO, String => IO[Unit]] = ... // Sink[IO,String]
val pipe: Process[IO,IO[Unit]] = 
  (fileIn zipWith fileOut)((line,write) => write(line))
def eval[F[_],O](p: Process[F, F[O]]): Process[F,O] = 
  p match { case Emit(h,t) => 
    Await(h, hEval => Emit(hEval, eval(t)) }

Effectful sinks and channels

val convert: Process[IO,Unit] = 
  lines("fahrenheit.txt") |> 
  filter(!_.startsWith("#")) |> 
  lift(fahrenheitToCelsius) to fileW("celsius.txt")

type Channel[F[_],I,O] = Process[F, I => F[O]]

trait Process[F[_],O] { ...
  def to[O2](p2: Channel[F,O,O2]): Process[F,O2] =
    eval { (this zipWith p2)((o,f) => f(o)) }


The usefulness of nondeterminism

def zipWith[O,O2,O3](f: (O,O2) => O3): Tee[O,O2,O3] = 
   awaitL[O,O2,O3](o => 
   awaitR         (o2 => emitT(f(o,o2)))).repeat

Representing nondeterminism explicitly

case class Y[I,I2]() {
  sealed trait f[X]
  case object A extends f[I]
  case object B extends f[I2]
  case object AB extends f[Either[I,I2]]
type Wye[I,I2,O] = Process[Y[I,I2]#f, O]

zipWith, revisited

def zipWithY[O,O2,O3](f: (O,O2) => O3): Wye[O,O2,O3] = 
  awaitAB[O,O2,O3] {
    case Right(o2) => awaitA(o => emitY(f(o,o2))) 
    case Left(o) => awaitB(o2 => emitY(f(o,o2)))
  } repeat

Interpreting nondeterminism

trait Nondeterminism[F[_]] {
  def choose[A,B](a: F[A], b: F[B]): 
    F[(A, F[B]) Either (F[A], B) ]
trait Nondeterminism {
  def choose[A,B](a: Future[A], b: Future[B]): 
    Future[(A, Future[B]) Either (Future[A], B) ]

Interpreting nondeterminism

trait Process[F[_],O] {
  def wye[O2,O3](
      p2: Process[F,O2])(
      y: Wye[O,O2,O3])(
      implicit F: Nondeterminism[F]): Process[F,O3]

Distributed execution


Credits: RĂșnar Bjarnason, Dan Doel, Ed Kmett

Machines library, by Ed Kmett:

Scala port, by RĂșnar and Dan:, discount code: scalawkd5

Coming soon to scalaz