An introduction to scalaz-stream

Paul Chiusano, March 1, 2014

@pchiusano

github.com/scalaz/scalaz-stream

Let's go

scala> import scalaz.stream.Process
import scalaz.stream.Process

scala> Process(1,2,3)
res0: Process[Nothing,Int] = Process(1, 2, 3)
scala> val P = Process
P: Process.type = Process$@3a62c6b7

scala> val xs = P(1,2,3) ++ P.emit(4) ++ P.emitAll(List(5,6,7))
xs: Process[Nothing,Int] = Process(1, 2, 3, 4, 5, 6, 7)
scala> xs.map(_ + 1).take(4)
res1: Process[Nothing,Int] = Process(2, 3, 4, 5)

scala> xs.zip(xs)
res2: Process[Nothing,(Int, Int)] =
  Process((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7))

scala> xs.flatMap(i => P(i,i))
res3: Process[Nothing,Int] =
  Process(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7)
scala> xs.<tab>
++                append            asInstanceOf
attempt           buffer            bufferAll
bufferBy          causedBy          chunk
chunkAll          chunkBy           chunkBy2
cleanup           collect           collectFirst
disconnect        drain             drop
dropLast          dropLastIf        dropWhile
evalMap           exists            fallback
fby               filter            find
flatMap           flush             fold
fold1             fold1Map          fold1Monoid
foldMap           foldMonoid        foldSemigroup
forall            gatherMap         handle
hardDisconnect    interleave        intersperse
isHalt            isInstanceOf      kill
killBy            last              map
onComplete        onFailure         once
orElse            partialAttempt    pipe
reduce            reduceMap         reduceMonoid
reduceSemigroup   repartition       repeat
run               runFoldMap        runLast
runLastOr         runLog            runStep
scan              scan1             scan1Map
scan1Monoid       scanMap           scanMonoid
scanSemigroup     sleepUntil        split
splitOn           splitWith         step
stepOr            take              takeWhile
tee               terminated        toString
translate         trim              unemit
until             when              window
zip               zipWith           |>
|||
scala> import scala.concurrent.duration._
import scala.concurrent.duration._

scala> val fizz = P.awakeEvery(3 seconds).map(_ => "fizz")
fizz: Process[Task,String] = Await(<task>) ...
scala> val fizz2 = fizz.take(2)
fizz2: Process[Task,String] = Await(<task>) ...
scala> val t = fizz2.map { d => println(d); d }.runLog
t: Task[IndexedSeq[String]] = Task@773f0cfd
scala> t.run
fizz // after 3 s
fizz // after 6 s
res7: IndexedSeq[String] = Vector(fizz, fizz)

scala> t.run
fizz
fizz
res8: IndexedSeq[String] = Vector(fizz, fizz)
scala> val buzz = P.awakeEvery(5 seconds).map(_ => "buzz")
buzz: Process[Task,String] = Await(<task>) ...

scala> val fizzbuzz = fizz merge buzz
fizzbuzz: Process[Task,String] = Await(<task>) ...

scala> fizzbuzz.take(5).map(println).run.run
fizz // 3 sec
buzz // 5 sec
fizz // 6 sec
fizz // 9 sec
buzz // 10 sec
scala> import scalaz.stream.tee
import scalaz.stream.tee

scala> import scalaz.stream.wye
import scalaz.stream.wye

scala> import scalaz.stream.process1
import scalaz.stream.process1
scala> process1.<tab>
asInstanceOf      awaitOption       buffer
bufferAll         bufferBy          chunk
chunkAll          chunkBy           chunkBy2
collect           collectFirst      drop
dropLast          dropLastIf        dropWhile
exists            feed              feed1
filter            find              fold
fold1             fold1Map          fold1Monoid
foldMap           foldMonoid        foldSemigroup
forall            id                init
intersperse       isInstanceOf      last
lastOr            lift              liftL
liftR             liftY             multiplex
record            reduce            reduceMap
reduceMonoid      reduceSemigroup   repartition
rethrow           scan              scan1
scan1Map          scan1Monoid       scanMap
scanMonoid        scanSemigroup     shiftRight
skip              split             splitOn
splitWith         stripNone         sum
take              takeThrough       takeWhile
terminated        toString          utf8Encode
window
scala> tee.<tab>
asInstanceOf   feed1L         feed1R
feedL          feedR          interleave
isInstanceOf   passL          passR
toString       until          when
zip            zipAll         zipWith
zipWithAll

scala> wye.<tab>
asInstanceOf     attachL          attachR
boundedQueue     drainL           drainR
dynamic          dynamic1         echoLeft
either           feed1            feed1L
feed1R           feedL            feedR
flip             haltL            haltR
interrupt        isInstanceOf     liftL
liftR            merge            timedQueue
toString         unboundedQueue   yip
yipL             yipWith          yipWithL
scala> import scalaz.stream.io
import scalaz.stream.io

scala> val lns = io.linesR("testdata/fahrenheit.txt")
lns: Process[Task,String] = Await(<task>) ...

scala> lns.take(3).runLog.run
res12: IndexedSeq[String] = Vector(
  "// this file contains a list of temperatures in degrees fahrenheit ",
  18.0,
  17.9)
scala> val lns2 = lns.onComplete { P.suspend { println("--CLEANUP--"); P.halt }}
lns2: Process[[x]Task[x],String] = Await(<task>) ...

scala> lns2.take(2).runLog.run
--CLEANUP--
res15: IndexedSeq[String] = Vector(
  "// this file contains a list of temperatures in degrees fahrenheit ",
  18.0)
scala> lns2.take(3).map(_.toDouble).runLog.attemptRun
--CLEANUP--
res16: scalaz.\/[Throwable,IndexedSeq[Double]] =
  -\/(java.lang.NumberFormatException: For input string: "// this file contains a list of temperatures in degrees fahrenheit")

Binding to asynchronous sources

Creating streams asynchronously (tasks)

def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ...

val t: Task[Array[Byte]] = Task.async { read } // magic!
val t2: Task[Array[Byte]] = for {
  bytes <- t
  moarBytes <- t // `t` not a running computation!
} yield (bytes ++ moarBytes)
def read(success: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ...

val t: Task[Array[Byte]] = Task.async { cb =>
  read(b => cb(right(b)), err => cb(left(b)))
}
val t2 = t.flatMap(b => Task(b)) // run rest of computation in a fresh logical thread
val t: Task[Array[Byte]] = ...
val read1: Process[Task,Array[Byte]] = P.eval(f)
val readAll: Process[Task,Array[Byte]] = read1.repeat

Creating streams asynchronously (queues)

import scalaz.stream.async

val (q, src) = async.queue[Int]

// Thread 1
q.enqueue(1)
q.enqueue(2)
...
q.close

// Thread 2
src: Process[Task,Int]
src.take(10).to(snk).run

Creating streams asynchronously (signals)

import scalaz.stream.async

val alive = async.signal[Boolean]

val now: Process[Task,Boolean] = alive.continuous
val onChange: Process[Task,Boolean] = alive.discrete

// Thread 1
alive.set(true).run
..
alive.set(false).run

// Thread 2
alive.discrete.take(10).map ...

Creating streams asynchronously (actors)

import scalaz.concurrent.Actor
case class M(cb: Throwable \/ MoarBytes => Unit)

val a = Actor.actor[M] { case M(cb) =>
  ...
  val result = reallyExpensiveOp(r)
  cb(result)
}

val t: Process[Task,MoarBytes] =
  Process.repeatEval(Task.async { cb => a ! M(cb) })

t.filter(_.canHazBytes).map(foo).fold(..)

Questions?

  • Credits: RĂșnar Bjarnason, Ed Kmett, Dan Doel, Pavel Chlupacek, ...
  • Also see, Ch 13 and 15 of FP in Scala: http://manning.com/bjarnason/
  • https://github.com/scalaz/scalaz-stream
  • Lots of stuff I didn't discuss

Appendix

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeoutException

scala> :pa
// Entering paste mode (ctrl-D to finish)

def atLeastEvery[A](rate: Process[Task,Any])(p: Process[Task,A]): Process[Task,A] =
  rate.either(p).pipe(process1.window(2)).flatMap { w =>
    if (w.forall(_.isLeft)) P.fail(new TimeoutException)
    else P.emitAll { w.headOption.toList.filter(_.isRight).map(_.getOrElse(???)) }
  }
scala> val heartbeat = P.awakeEvery(1 seconds)
heartbeat: Process[Task,Duration] = Await(<task>) ...

scala> atLeastEvery(P.awakeEvery(15 seconds))(heartbeat)
res11: Process[Task,Duration] = Await(<task>) ...