Paul Chiusano, March 1, 2014
@pchiusano
github.com/scalaz/scalaz-stream
scala> import scalaz.stream.Process
import scalaz.stream.Process
scala> Process(1,2,3)
res0: Process[Nothing,Int] = Process(1, 2, 3)
scala> val P = Process
P: Process.type = Process$@3a62c6b7
scala> val xs = P(1,2,3) ++ P.emit(4) ++ P.emitAll(List(5,6,7))
xs: Process[Nothing,Int] = Process(1, 2, 3, 4, 5, 6, 7)
scala> xs.map(_ + 1).take(4)
res1: Process[Nothing,Int] = Process(2, 3, 4, 5)
scala> xs.zip(xs)
res2: Process[Nothing,(Int, Int)] =
Process((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7))
scala> xs.flatMap(i => P(i,i))
res3: Process[Nothing,Int] =
Process(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7)
scala> xs.<tab>
++ append asInstanceOf
attempt buffer bufferAll
bufferBy causedBy chunk
chunkAll chunkBy chunkBy2
cleanup collect collectFirst
disconnect drain drop
dropLast dropLastIf dropWhile
evalMap exists fallback
fby filter find
flatMap flush fold
fold1 fold1Map fold1Monoid
foldMap foldMonoid foldSemigroup
forall gatherMap handle
hardDisconnect interleave intersperse
isHalt isInstanceOf kill
killBy last map
onComplete onFailure once
orElse partialAttempt pipe
reduce reduceMap reduceMonoid
reduceSemigroup repartition repeat
run runFoldMap runLast
runLastOr runLog runStep
scan scan1 scan1Map
scan1Monoid scanMap scanMonoid
scanSemigroup sleepUntil split
splitOn splitWith step
stepOr take takeWhile
tee terminated toString
translate trim unemit
until when window
zip zipWith |>
|||
scala> import scala.concurrent.duration._
import scala.concurrent.duration._
scala> val fizz = P.awakeEvery(3 seconds).map(_ => "fizz")
fizz: Process[Task,String] = Await(<task>) ...
scala> val fizz2 = fizz.take(2)
fizz2: Process[Task,String] = Await(<task>) ...
scala> val t = fizz2.map { d => println(d); d }.runLog
t: Task[IndexedSeq[String]] = Task@773f0cfd
scala> t.run
fizz // after 3 s
fizz // after 6 s
res7: IndexedSeq[String] = Vector(fizz, fizz)
scala> t.run
fizz
fizz
res8: IndexedSeq[String] = Vector(fizz, fizz)
scala> val buzz = P.awakeEvery(5 seconds).map(_ => "buzz")
buzz: Process[Task,String] = Await(<task>) ...
scala> val fizzbuzz = fizz merge buzz
fizzbuzz: Process[Task,String] = Await(<task>) ...
scala> fizzbuzz.take(5).map(println).run.run
fizz // 3 sec
buzz // 5 sec
fizz // 6 sec
fizz // 9 sec
buzz // 10 sec
scala> import scalaz.stream.tee
import scalaz.stream.tee
scala> import scalaz.stream.wye
import scalaz.stream.wye
scala> import scalaz.stream.process1
import scalaz.stream.process1
scala> process1.<tab>
asInstanceOf awaitOption buffer
bufferAll bufferBy chunk
chunkAll chunkBy chunkBy2
collect collectFirst drop
dropLast dropLastIf dropWhile
exists feed feed1
filter find fold
fold1 fold1Map fold1Monoid
foldMap foldMonoid foldSemigroup
forall id init
intersperse isInstanceOf last
lastOr lift liftL
liftR liftY multiplex
record reduce reduceMap
reduceMonoid reduceSemigroup repartition
rethrow scan scan1
scan1Map scan1Monoid scanMap
scanMonoid scanSemigroup shiftRight
skip split splitOn
splitWith stripNone sum
take takeThrough takeWhile
terminated toString utf8Encode
window
scala> tee.<tab>
asInstanceOf feed1L feed1R
feedL feedR interleave
isInstanceOf passL passR
toString until when
zip zipAll zipWith
zipWithAll
scala> wye.<tab>
asInstanceOf attachL attachR
boundedQueue drainL drainR
dynamic dynamic1 echoLeft
either feed1 feed1L
feed1R feedL feedR
flip haltL haltR
interrupt isInstanceOf liftL
liftR merge timedQueue
toString unboundedQueue yip
yipL yipWith yipWithL
scala> import scalaz.stream.io
import scalaz.stream.io
scala> val lns = io.linesR("testdata/fahrenheit.txt")
lns: Process[Task,String] = Await(<task>) ...
scala> lns.take(3).runLog.run
res12: IndexedSeq[String] = Vector(
"// this file contains a list of temperatures in degrees fahrenheit ",
18.0,
17.9)
scala> val lns2 = lns.onComplete { P.suspend { println("--CLEANUP--"); P.halt }}
lns2: Process[[x]Task[x],String] = Await(<task>) ...
scala> lns2.take(2).runLog.run
--CLEANUP--
res15: IndexedSeq[String] = Vector(
"// this file contains a list of temperatures in degrees fahrenheit ",
18.0)
scala> lns2.take(3).map(_.toDouble).runLog.attemptRun
--CLEANUP--
res16: scalaz.\/[Throwable,IndexedSeq[Double]] =
-\/(java.lang.NumberFormatException: For input string: "// this file contains a list of temperatures in degrees fahrenheit")
def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ...
val t: Task[Array[Byte]] = Task.async { read } // magic!
val t2: Task[Array[Byte]] = for {
bytes <- t
moarBytes <- t // `t` not a running computation!
} yield (bytes ++ moarBytes)
def read(success: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ...
val t: Task[Array[Byte]] = Task.async { cb =>
read(b => cb(right(b)), err => cb(left(b)))
}
val t2 = t.flatMap(b => Task(b)) // run rest of computation in a fresh logical thread
val t: Task[Array[Byte]] = ...
val read1: Process[Task,Array[Byte]] = P.eval(f)
val readAll: Process[Task,Array[Byte]] = read1.repeat
import scalaz.stream.async
val (q, src) = async.queue[Int]
// Thread 1
q.enqueue(1)
q.enqueue(2)
...
q.close
// Thread 2
src: Process[Task,Int]
src.take(10).to(snk).run
import scalaz.stream.async
val alive = async.signal[Boolean]
val now: Process[Task,Boolean] = alive.continuous
val onChange: Process[Task,Boolean] = alive.discrete
// Thread 1
alive.set(true).run
..
alive.set(false).run
// Thread 2
alive.discrete.take(10).map ...
import scalaz.concurrent.Actor
case class M(cb: Throwable \/ MoarBytes => Unit)
val a = Actor.actor[M] { case M(cb) =>
...
val result = reallyExpensiveOp(r)
cb(result)
}
val t: Process[Task,MoarBytes] =
Process.repeatEval(Task.async { cb => a ! M(cb) })
t.filter(_.canHazBytes).map(foo).fold(..)
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeoutException
scala> :pa
// Entering paste mode (ctrl-D to finish)
def atLeastEvery[A](rate: Process[Task,Any])(p: Process[Task,A]): Process[Task,A] =
rate.either(p).pipe(process1.window(2)).flatMap { w =>
if (w.forall(_.isLeft)) P.fail(new TimeoutException)
else P.emitAll { w.headOption.toList.filter(_.isRight).map(_.getOrElse(???)) }
}
scala> val heartbeat = P.awakeEvery(1 seconds)
heartbeat: Process[Task,Duration] = Await(<task>) ...
scala> atLeastEvery(P.awakeEvery(15 seconds))(heartbeat)
res11: Process[Task,Duration] = Await(<task>) ...