I/O as a type

... and how to do asynchronous I/O without callbacks

Paul Chiusano, @pchiusano

An ordinary imperative program...

// a pure function
def fahrenheitToCelsius(f: Double): Double = 
  (f - 32) * 5.0/9.0

// Ordinary code with side effects
def converter: Unit = {
  println("Enter a temperature in degrees fahrenheit: ")
  val d = readLine.toDouble
  println(fahrenheitToCelsius(d))
}
> Enter a temperature in degrees fahrenheit: 80
26.6667

Types aren't very descriptive...

Modeling external effects

trait IO[+A] {
  def map[B](f: A => B): IO[B]
  def flatMap[B](f: A => IO[B]): IO[B]
}

val ReadLine: IO[String] = ???
def PrintLine(line: String): IO[Unit] = ???

def converter: IO[Unit] = for {
  _ <- PrintLine("Enter a temperature in degrees fahrenheit: ")
  d <- ReadLine.map(_.toDouble)
  _ <- PrintLine(fahrenheitToCelsius(d).toString)
} yield ()

Modeling external effects

trait IO[+A] { self => // give name for `this` pointer
  def run: A 
  def map[B](f: A => B): IO[B] = 
    new IO[B] { def run = f(self.run) } // `self` refs outer `IO` 
  def flatMap[B](f: A => IO[B]): IO[B] = 
    new IO[B] { def run = f(self.run).run }
}
object IO {
  def apply[A](a: => A) = new IO[A] { def run = a }
  val ReadLine: IO[String] = IO { readLine }
  def PrintLine(line: String): IO[Unit] = IO { println(line) }
}

Benefit 1

Benefit 2

Benefit 2 (cont)

object IO extends Monad[IO] { ... }

val echo: IO[Unit] = ReadLine.flatMap(PrintLn)
val readInt: IO[Int] = ReadLine.map(_.toInt)
val prompts: IO[Unit] = converter.replicateM_(5)
val lines: IO[List[String]] = ReadLine.replicateM(10)

Benefit 2 (cont)

def converter: IO[Unit] = IO {
  println("Enter a temperature in degrees fahrenheit: ")
  val d = readLine.toDouble
  println(fahrenheitToCelsius(d))
}

Benefit 3

IO, revisted

trait IO[+A] { 
  def run: A 
  ... 
}

def converter: IO[Unit] = for {
  _ <- PrintLine("Enter a temperature in degrees fahrenheit: ")
  d <- ReadLine.map(_.toDouble)
  _   <- PrintLine(fahrenheitToCelsius(d).toString)
} yield ()

IO, revisted (cont)

trait IO[F[_], +A]
case class Pure[F[_], +A](get: A) extends IO[F,A]
case class Request[F[_], I, +A](
    expr: F[I], 
    receive: I => IO[F,A]) extends IO[F,A]

An example F: console access

trait Console[A]
case object ReadLine extends Console[Option[String]]
case class PrintLine(s: String) extends Console[Unit]

def converter: IO[Console,Unit] = for {
  _ <- PrintLine("Enter a temperature in degrees fahrenheit: ")
  d <- ReadLine.map(_.map(_.toDouble))
  _ <- when(d.isDefined) { PrintLine(fahrenheitToCelsius(d.get).toString) }
} yield ()

Running IO actions

trait Run[F[_]] { 
  def apply[A](expr: F[A]): (A, Run[F]) 
}

object IO {
  @annotation.tailrec
  def run[F[_],A](R: Run[F])(io: IO[F,A]): A = io match {
    case Pure(a) => a
    case Request(expr,recv) =>
      R(expr) match { case (e,r2) => run(r2)(recv(e)) }
  }
}

Running console actions

def RunConsoleMock(lines: List[String]) = new Run[Console] {
  def apply[A](c: Console[A]) = c match {
    case ReadLine => if (lines.isEmpty) (None, this) 
                     else (Some(lines.head), RunConsoleMock(lines.tail))
    case PrintLine(_) => ((), RunConsoleMock) // Ignored! 
  }
}

Can we run asynchronously?

trait IO[F[_], +A]
case class Pure[F[_], +A](get: A) extends IO[F,A]
case class Request[F[_], I, +A](
    expr: F[I], 
    receive: I => IO[F,A]) extends IO[F,A]

object IO {
  @annotation.tailrec
  def run[F[_],A](R: Run[F])(io: IO[F,A]): A = io match {
    case Pure(a) => a
    case Request(expr,recv) =>
      R(expr) match { case (e,r2) => run(r2)(recv(e)) }
  }
}

Can we run asynchronously? Yes.

trait RunAsync[F[_]] {
  def apply[A](a: F[A]): Future[(A, RunAsync[F])]
}
object IO {
  def run[F[_],A](R: RunAsync[F])(io: IO[F,A]): Future[A] = io match {
    case Pure(a) => Future(a)
    case Request(expr,recv) =>
      R(expr) flatMap { case (e,r2) => run(r2)(recv(e)) }
  }
}

Can we run asynchronously? Yes. (cont)

class AsynchronousFileChannel {
  ... 
  def read[A](
    buf: ByteBuffer, 
    callback: (A, Int) => Unit): Unit // real API much more horrible
}

def read(f: AsynchronousFileChannel, nBytes: Int): Future[Array[Byte]] = {
  val promise = concurrent.Promise[Array[Byte]]()
  val buf = ByteBuffer.allocate(nBytes)
  f.read(buf, (a: A, bytesRead: Int) => promise.success)
  promise.future
}

Another view on runAsync

IO[Console,A] => IO[Future,A]
trait Trans[F[_], G[_]] {
  def apply[A](f: F[A]): G[A]
}

def run[F[_],G[_],A](T: Trans[F,G])(G: Monad[G])(io: IO[F,A]): G[A] =
  io match {
    case Pure(a) => G.unit(a)  
    case Request(expr,k) => 
      G.flatMap (T(expr)) (e => run(T)(G)(k(e)))
  }
}

Ideas for I/O managers

General idea: retain enough metadata to do optimizations

Buffering + merging I/O operations

Reordering operations that commute to improve locality

Domain specific optimizations for particular IO[F,A]

And yet...

def converter: IO[Unit] = for {
  _ <- PrintLine("Enter a temperature in degrees fahrenheit: ")
  d <- ReadLine.map(_.toDouble)
  _   <- PrintLine(fahrenheitToCelsius(d).toString)
} yield ()

More resources

Chapter 13 of FP in Scala discusses this content in more detail: manning.com/bjarnason

Upcoming talk at NE Scala Symposium discussing more composable APIs for I/O

Questions?

Appendix: stack usage

'Real' version requires trampolining

Appendix: other models of I/O

type IO[A] = RealWorld => (A, RealWorld)

/

#