Paul Chiusano, March 1, 2014
@pchiusano
github.com/scalaz/scalaz-stream
scala> import scalaz.stream.Process
import scalaz.stream.Process
scala> Process(1,2,3)
res0: Process[Nothing,Int] = Process(1, 2, 3)scala> val P = Process
P: Process.type = Process$@3a62c6b7
scala> val xs = P(1,2,3) ++ P.emit(4) ++ P.emitAll(List(5,6,7))
xs: Process[Nothing,Int] = Process(1, 2, 3, 4, 5, 6, 7)scala> xs.map(_ + 1).take(4)
res1: Process[Nothing,Int] = Process(2, 3, 4, 5)
scala> xs.zip(xs)
res2: Process[Nothing,(Int, Int)] =
  Process((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7))
scala> xs.flatMap(i => P(i,i))
res3: Process[Nothing,Int] =
  Process(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7)scala> xs.<tab>
++                append            asInstanceOf
attempt           buffer            bufferAll
bufferBy          causedBy          chunk
chunkAll          chunkBy           chunkBy2
cleanup           collect           collectFirst
disconnect        drain             drop
dropLast          dropLastIf        dropWhile
evalMap           exists            fallback
fby               filter            find
flatMap           flush             fold
fold1             fold1Map          fold1Monoid
foldMap           foldMonoid        foldSemigroup
forall            gatherMap         handle
hardDisconnect    interleave        intersperse
isHalt            isInstanceOf      kill
killBy            last              map
onComplete        onFailure         once
orElse            partialAttempt    pipe
reduce            reduceMap         reduceMonoid
reduceSemigroup   repartition       repeat
run               runFoldMap        runLast
runLastOr         runLog            runStep
scan              scan1             scan1Map
scan1Monoid       scanMap           scanMonoid
scanSemigroup     sleepUntil        split
splitOn           splitWith         step
stepOr            take              takeWhile
tee               terminated        toString
translate         trim              unemit
until             when              window
zip               zipWith           |>
|||scala> import scala.concurrent.duration._
import scala.concurrent.duration._
scala> val fizz = P.awakeEvery(3 seconds).map(_ => "fizz")
fizz: Process[Task,String] = Await(<task>) ...scala> val fizz2 = fizz.take(2)
fizz2: Process[Task,String] = Await(<task>) ...scala> val t = fizz2.map { d => println(d); d }.runLog
t: Task[IndexedSeq[String]] = Task@773f0cfdscala> t.run
fizz // after 3 s
fizz // after 6 s
res7: IndexedSeq[String] = Vector(fizz, fizz)
scala> t.run
fizz
fizz
res8: IndexedSeq[String] = Vector(fizz, fizz)scala> val buzz = P.awakeEvery(5 seconds).map(_ => "buzz")
buzz: Process[Task,String] = Await(<task>) ...
scala> val fizzbuzz = fizz merge buzz
fizzbuzz: Process[Task,String] = Await(<task>) ...
scala> fizzbuzz.take(5).map(println).run.run
fizz // 3 sec
buzz // 5 sec
fizz // 6 sec
fizz // 9 sec
buzz // 10 secscala> import scalaz.stream.tee
import scalaz.stream.tee
scala> import scalaz.stream.wye
import scalaz.stream.wye
scala> import scalaz.stream.process1
import scalaz.stream.process1scala> process1.<tab>
asInstanceOf      awaitOption       buffer
bufferAll         bufferBy          chunk
chunkAll          chunkBy           chunkBy2
collect           collectFirst      drop
dropLast          dropLastIf        dropWhile
exists            feed              feed1
filter            find              fold
fold1             fold1Map          fold1Monoid
foldMap           foldMonoid        foldSemigroup
forall            id                init
intersperse       isInstanceOf      last
lastOr            lift              liftL
liftR             liftY             multiplex
record            reduce            reduceMap
reduceMonoid      reduceSemigroup   repartition
rethrow           scan              scan1
scan1Map          scan1Monoid       scanMap
scanMonoid        scanSemigroup     shiftRight
skip              split             splitOn
splitWith         stripNone         sum
take              takeThrough       takeWhile
terminated        toString          utf8Encode
windowscala> tee.<tab>
asInstanceOf   feed1L         feed1R
feedL          feedR          interleave
isInstanceOf   passL          passR
toString       until          when
zip            zipAll         zipWith
zipWithAll
scala> wye.<tab>
asInstanceOf     attachL          attachR
boundedQueue     drainL           drainR
dynamic          dynamic1         echoLeft
either           feed1            feed1L
feed1R           feedL            feedR
flip             haltL            haltR
interrupt        isInstanceOf     liftL
liftR            merge            timedQueue
toString         unboundedQueue   yip
yipL             yipWith          yipWithLscala> import scalaz.stream.io
import scalaz.stream.io
scala> val lns = io.linesR("testdata/fahrenheit.txt")
lns: Process[Task,String] = Await(<task>) ...
scala> lns.take(3).runLog.run
res12: IndexedSeq[String] = Vector(
  "// this file contains a list of temperatures in degrees fahrenheit ",
  18.0,
  17.9)scala> val lns2 = lns.onComplete { P.suspend { println("--CLEANUP--"); P.halt }}
lns2: Process[[x]Task[x],String] = Await(<task>) ...
scala> lns2.take(2).runLog.run
--CLEANUP--
res15: IndexedSeq[String] = Vector(
  "// this file contains a list of temperatures in degrees fahrenheit ",
  18.0)scala> lns2.take(3).map(_.toDouble).runLog.attemptRun
--CLEANUP--
res16: scalaz.\/[Throwable,IndexedSeq[Double]] =
  -\/(java.lang.NumberFormatException: For input string: "// this file contains a list of temperatures in degrees fahrenheit")def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ...
val t: Task[Array[Byte]] = Task.async { read } // magic!
val t2: Task[Array[Byte]] = for {
  bytes <- t
  moarBytes <- t // `t` not a running computation!
} yield (bytes ++ moarBytes)def read(success: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ...
val t: Task[Array[Byte]] = Task.async { cb =>
  read(b => cb(right(b)), err => cb(left(b)))
}
val t2 = t.flatMap(b => Task(b)) // run rest of computation in a fresh logical threadval t: Task[Array[Byte]] = ...
val read1: Process[Task,Array[Byte]] = P.eval(f)
val readAll: Process[Task,Array[Byte]] = read1.repeatimport scalaz.stream.async
val (q, src) = async.queue[Int]
// Thread 1
q.enqueue(1)
q.enqueue(2)
...
q.close
// Thread 2
src: Process[Task,Int]
src.take(10).to(snk).runimport scalaz.stream.async
val alive = async.signal[Boolean]
val now: Process[Task,Boolean] = alive.continuous
val onChange: Process[Task,Boolean] = alive.discrete
// Thread 1
alive.set(true).run
..
alive.set(false).run
// Thread 2
alive.discrete.take(10).map ...import scalaz.concurrent.Actor
case class M(cb: Throwable \/ MoarBytes => Unit)
val a = Actor.actor[M] { case M(cb) =>
  ...
  val result = reallyExpensiveOp(r)
  cb(result)
}
val t: Process[Task,MoarBytes] =
  Process.repeatEval(Task.async { cb => a ! M(cb) })
t.filter(_.canHazBytes).map(foo).fold(..)scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeoutException
scala> :pa
// Entering paste mode (ctrl-D to finish)
def atLeastEvery[A](rate: Process[Task,Any])(p: Process[Task,A]): Process[Task,A] =
  rate.either(p).pipe(process1.window(2)).flatMap { w =>
    if (w.forall(_.isLeft)) P.fail(new TimeoutException)
    else P.emitAll { w.headOption.toList.filter(_.isRight).map(_.getOrElse(???)) }
  }scala> val heartbeat = P.awakeEvery(1 seconds)
heartbeat: Process[Task,Duration] = Await(<task>) ...
scala> atLeastEvery(P.awakeEvery(15 seconds))(heartbeat)
res11: Process[Task,Duration] = Await(<task>) ...