Paul Chiusano, @pchiusano, manning.com/bjarnason
Process1
: a 1-input stream processorsealed trait Process1[I,O]
case class Halt[I,O]() extends Process1[I,O]
case class Emit[I,O](
head: Seq[O],
tail: Process1[I,O] = Halt[I,O]())
extends Process1[I,O]
case class Await[I,O](
recv: I => Process1[I,O],
fallback: Process1[I,O] = Halt[I,O]())
extends Process1[I,O]
Process1
definitionsdef sum: Process1[Double,Double] = {
def go(acc: Double): Process1[Double,Double] =
Await((d: Double) => Emit(Seq(d+acc), go(d+acc)))
go(0.0)
}
scala> sum(Stream(1.0, 2.0, 3.0, 4.0)).toList
val res0: List(1.0, 3.0, 6.0, 10.0)
Process1[I,O]
def apply(s: Stream[I]): Stream[O] = this match {
case Halt() => Stream()
case Await(recv, fallback) => s match {
case h #:: t => recv(h)(t)
case _ => fallback(s) // Stream is empty
}
case Emit(h,t) => h.toStream append t(s)
}
Process1[I,O]
: the List[O]
viewdef ++(p: => Process1[I,O]): Process1[I,O]
def flatMap[O2](f: O => Process1[I,O2]): Process1[I,O2]
def map[O2](f: O => O2): Process1[I,O2]
def repeat: Process1[I,O]
def filter[I](f: I => Boolean): Process1[I,I]
def take[I](n: Int): Process1[I,I]
...
Process1[I,O]
: the I => O
viewdef lift[I,O](f: I => O): Process1[I,O] =
Await((i: I) => Emit(Seq(f(i)))) repeat
def |>[O2](p2: Process1[O,O2]): Process1[I,O2]
Process1
exampleval lineCounter: Process1[String,Int] =
filter(!_.startsWith("//")) |>
filter(!_.trim.isEmpty) |>
lift(_ => 1.0) |>
sum |>
lift(_ toInt)
val prog: ??? = lines("Main.scala") |> lineCounter
Process1
case class Await[I,O](
recv: I => Process1[I,O],
finalizer: Process1[I,O] = Halt[I,O]())
extends Process1[I,O]
case class Await[F[_],A,O](
req: F[A], recv: A => Process[F,O],
fallback: Process[F,O],
onError: Process[F,O]) extends Process[F,O]
Process
typetrait Process[F[_],O]
case class Await[F[_],A,O](
req: F[A], recv: A => Process[F,O],
fallback: Process[F,O],
onError: rocess[F,O]) extends Process[F,O]
case class Emit[F[_],O](
head: Seq[O],
tail: Process[F,O]) extends Process[F,O]
case class Halt[F[_],O]() extends Process[F,O]
def ++(p: => Process[F,O]): Process[F,O]
def flatMap[O2](f: O => Process[F,O2]): Process[F,O2]
def map[O2](f: O => O2): Process[F,O2]
def repeat: Process[F,O2]
Process1
is a Process
case class Is[I]() {
sealed trait f[X]
case object Get extends f[I]
}
def Get[I] = Is[I]().Get
type Process1[I,O] = Process[Is[I]#f, O]
Await(Get[Int],
(i: Int) => Emit(Seq(i+1))).repeat
Process1
is a Process
trait Process[F[_],O] {
...
def |>[O2](p2: Process1[O,O2]): Process[F,O2]
}
Tee
: a two-input Process
case class T[I,I2]() {
sealed trait f[X]
case object L extends f[I]
case object R extends f[I2]
}
type Tee[I,I2,O] = Process[T[I,I2]#f, O]
Tee
: a two-input Process
def zipWith[O,O2,O3](f: (O,O2) => O3): Tee[O,O2,O3] =
awaitL[O,O2,O3](o =>
awaitR (o2 => emitT(f(o,o2)))).repeat
trait Process[F[_],O] {
...
def tee[O2,O3](p2: Process[F,O2])(
t: Tee[O,O2,O3]): Process[F,O3]
}
type Source[O] = Process[IO,O]
case class Await[A,O](
req: IO[A], // or Future[A]
recv: A => Process[O],
..
trait Partial[F[_]] {
def attempt[A](a: F[A]): F[Either[Throwable,A]]
def fail[A](t: Throwable): F[A]
}
// indicates normal termination
case object End extends Exception
implicit class Source[F[_]:Monad:Partial, O](
src: Process[F,O]) {
def collect: F[IndexedSeq[O]] = ...
}
def lines(filename: String): Process[IO, String]
def rows(query: PreparedStatement): Process[IO, Row]
def resource[R,O](
acquire: IO[R])(
release: R => IO[Unit],
step: R => IO[Either[Throwable,O]]): Process[IO,O]
type Sink[F[_],O] = Process[F,O => F[Unit]]
val fileIn: Process[IO,String] = ...
val fileOut: Process[IO, String => IO[Unit]] = ... // Sink[IO,String]
val pipe: Process[IO,IO[Unit]] =
(fileIn zipWith fileOut)((line,write) => write(line))
def eval[F[_],O](p: Process[F, F[O]]): Process[F,O] =
p match { case Emit(h,t) =>
Await(h, hEval => Emit(hEval, eval(t)) }
val convert: Process[IO,Unit] =
lines("fahrenheit.txt") |>
filter(!_.startsWith("#")) |>
lift(fahrenheitToCelsius) to fileW("celsius.txt")
type Channel[F[_],I,O] = Process[F, I => F[O]]
trait Process[F[_],O] { ...
def to[O2](p2: Channel[F,O,O2]): Process[F,O2] =
eval { (this zipWith p2)((o,f) => f(o)) }
def zipWith[O,O2,O3](f: (O,O2) => O3): Tee[O,O2,O3] =
awaitL[O,O2,O3](o =>
awaitR (o2 => emitT(f(o,o2)))).repeat
case class Y[I,I2]() {
sealed trait f[X]
case object A extends f[I]
case object B extends f[I2]
case object AB extends f[Either[I,I2]]
}
type Wye[I,I2,O] = Process[Y[I,I2]#f, O]
zipWith
, revisiteddef zipWithY[O,O2,O3](f: (O,O2) => O3): Wye[O,O2,O3] =
awaitAB[O,O2,O3] {
case Right(o2) => awaitA(o => emitY(f(o,o2)))
case Left(o) => awaitB(o2 => emitY(f(o,o2)))
} repeat
trait Nondeterminism[F[_]] {
def choose[A,B](a: F[A], b: F[B]):
F[(A, F[B]) Either (F[A], B) ]
}
trait Nondeterminism {
def choose[A,B](a: Future[A], b: Future[B]):
Future[(A, Future[B]) Either (Future[A], B) ]
}
trait Process[F[_],O] {
def wye[O2,O3](
p2: Process[F,O2])(
y: Wye[O,O2,O3])(
implicit F: Nondeterminism[F]): Process[F,O3]
...
}
scalaz
/
#