Magic of integrations

ecosystem better than any framework

Wojtek Pituła @ Sony Electronics // @Krever01
w.pitula.me/presentations

We all love frameworks

Don't we?

frameworks are easy

To use.

frameworks are hard

To understand and compose.

Can we do better?

Libraries maybe?

Composing libraries into a stack is hard

Most of the times.

But does it have to be?

Let's create an APP

For taking notes.

We'll start with a note


        case class Note(title: String, text: String)
        

Note can be saved


        def save(note: Note): IO[Unit]
        

Notes can ba retrived


        def getAll(): IO[List[Note]]
        

How to expose it?

REST, obviously.

Like this


                    createNote
                    
~>

                    POST /notes
                    

                        getAllNotes
                    
~>

                        GET /notes
                    

                        Note
                    
~>

                        { "title": "my title", "text": "my text"}
                    

First hard choice

HTTP server.

Http Server

Options

  • play
  • finch
  • http4s
  • akka-http
  • cask
  • spray
Possible stacks: 6

Meet Http4s

Http is functional


        HttpRequest => HttpResponse
        

But its also effectful


                    HttpRequest[F] => F[Option[HttpResponse[F]]]
                    

And so it is in http4s


                    type HttpRoutes[F[_]] = HttpRequest[F] => F[Option[HttpResponse[F]]]
                    
Or to be exact

                        type HttpRoutes[F[_]] = Kleisli[OptionT[F, ?], Request[F], Response[F]]
                        

Back to our API


        trait NotesController[F[_]] {
          def saveNote: HttpRoutes[F]
          def getNotes: HttpRoutes[F]
        }
        

Back to our API

A basic implementation

Effect-polymorphic via cats.effect.Sync!

                    class DummyNotesController[F[_]: Sync] extends NotesController[F] {
                      val dsl = Http4sDsl[F]
                      import dsl._
                      override def saveNote = HttpRoutes.of[F] {
                        case POST -> Root / "notes" => Ok("Hello")
                      }
                      override def getNotes = HttpRoutes.of[F] {
                        case GET -> Root / "notes" => Ok("World")
                      }
                    }
                    

Back to our API

And that's how we use it


          def startApp[F[_]: ConcurrentEffect: Timer]
            (controller: NotesController[F]): F[ExitCode] = {
            val services      = controller.getNotes <+> controller.saveNote
            val httpApp       = Router("/" -> services).orNotFound
            val serverBuilder = BlazeServerBuilder[F]
                .bindHttp(8080, "localhost")
                .withHttpApp(httpApp)
            serverBuilder.serve.compile.drain
              .as(ExitCode.Success)
          }
        
Composed via cats.SemigroupK!

But where are our jsons?

The second hard choice.

Json (De) Serialization

Options

  • circe
  • argonaut
  • spray-json
  • upickle
  • json4s
  • play-json
  • jsoniter-scala
  • jackson
Possible stacks: 6 x 8 = 48

Meet Circe

Circe is pure

No exceptions, no reflection

                        import io.circe._
                        val json: Either[ParsingFailure, Json]  = parser.parse("{}")
                        val data: Either[DecodingFailure, Note] = json.right.get.as[Note]
                    

Circe is easy


                        import io.circe.generic.JsonCodec

                        @JsonCodec
                        case class Note(title: String, text: String)
                    

Does it integrate?

Sure it does!

Integration 1

http4s-circe


                      import org.http4s.circe.CirceEntityDecoder._
                      import org.http4s.circe.CirceEntityEncoder._

                      override def saveNote: HttpRoutes[F] = HttpRoutes.of[F] {
                        case req @ POST -> Root / "notes" =>
                          for {
                            note <- req.as[Note]
                            resp <- Ok(s"Saved")
                          } yield resp
                      }

                      override def getNotes: HttpRoutes[F] = HttpRoutes.of[F] {
                        case GET -> Root / "notes" =>
                          for {
                            resp  <- Ok(List(Note("title", "text")))
                          } yield resp
                      }
                    

It's time for persistence

And the third hard choice.

Database Persistance

Options

  • doobie
  • quill
  • slick
  • scalikejdbc
  • anorm
  • skunk
Possible stacks: 6 x 8 x 6 = 288

Meet doobie & quill

But before we go...

                        trait NotesRepository[F[_]] {

                          def initialize(): F[Unit]

                          def save(note: Note): F[Unit]

                          def getAll(): F[List[Note]]

                        }
                    
Doobie is a real dope
Effect-polymorphic via cats.effect.Bracket!

                        class DoobieNotesRepository[F[_]: Bracket[?[_], Throwable]](xa: Transactor[F]) extends NotesRepository[F] {
                          import doobie.implicits._
                          import cats.syntax.all._

                          def initialize(): F[Unit] = {
                            val q = sql"create table if not exists notes_v1 ( title VARCHAR, text VARCHAR)".update
                            q.run.transact(xa).as(())
                          }

                          override def save(note: Note): F[Unit] = {
                            val q = sql"insert into notes_v1 (title, text) values (${note.title}, ${note.text})".update
                            q.run.transact(xa).as(())
                          }

                          override def getAll(): F[List[Note]] = {
                            val q = sql"select title, text from notes_v1".query[Note]
                            q.to[List].transact(xa)
                          }
                        }
                    

SQL is great

But often it's not

quill

To the rescue

Quill generates sql

In compilation time

It really does


                        import io.getquill._

                        val ctx = new SqlMirrorContext(MirrorSqlDialect, Literal)
                        import ctx._
                    

                        case class Point(x: Int, y: Int)
                    

                        val q = quote {
                          query[Point].filter(p => p.x > 10)
                        }
                        ctx.run(q)
                        // select x, y from point where x > 10
                    
No more running your app to check what query gets executed

Does it mean saying no to doobie?

Not at all

Integration 2

doobie-quill


                        class QuillNotesRepository[F[_]: Bracket[?[_], Throwable]]
                            (xa: Transactor[F], ctx: DoobieContext.H2[SnakeCase.type])
                            extends NotesRepository[F] {
                          import ctx._
                          private implicit val notesSchemaMeta = schemaMeta[Note]("notes_v1")

                          def initialize(): F[Unit] = {
                            val q = sql"create table if not exists notes_v1 ( title VARCHAR, text VARCHAR)".update
                            q.run.transact(xa).as(())
                          }

                          override def save(note: Note): F[Unit] = {
                            val q = quote { query[Note].insert(lift(note)) }
                            run(q).transact(xa).as(())
                          }

                          override def getAll(): F[List[Note]] = {
                            val q = quote { query[Note] }
                            run(q).transact(xa)
                          }
                        }
                    

Bonus

doobie + postgres + circe =


                      import doobie.postgres.circe.json.implicits._

                      sql"select '{}' :: json".query[Json]

                      sql"select ${Json.obj()} :: json".query[Json]
                    

But where is the Transactor?


                          class DoobieNotesRepository[...](xa: Transactor[F])
                    

                          class QuillNotesRepository[...](xa: Transactor[F], ctx: DoobieContext.H2[SnakeCase.type])
                    

                          Transactor.fromDriverManager[IO](
                            driver = "org.h2.Driver",
                            url    = "jdbc:h2:mem:MyDatabase;DB_CLOSE_DELAY=-1",
                          )
                    

Not good enough!

We need config!

Configuration

Options

  • typesafe-config
  • pureconfig
  • ciris
  • clear-config
  • sconfig
Possible stacks: 6 x 8 x 6 x 5 = 1440

Meet ciris

Ciris 101

Extract


                        val port: ConfigValue[Int] =
                            env("MAGIC_API_PORT")
                              .or(prop("magic.api.port"))
                              .as[Int]
                              .default(8080)
                    

Ciris 101

Join


                        case class Api(address: String, port: Int)

                        val address: ConfigValue[String] = ???

                        val apiConfig: ConfigValue[Api] =
                            (address, port).parMapN(Api)
                    
Composed via cats.Parallel!

Ciris 101

Load


                        val config: IO[Api] = apiConfig.load[IO]
                    
Effect-polymorphic via cats.effect.Async!

Do you remember this?


                        def getNotes(): F[List[Note]]
                    

All data in memory... isn't it a little bit stupid?

It is

Streaming steps in

And brings another choice to make

Stream processing

Options

  • monix-observable
  • akka-streams
  • fs2
  • zio-streams
Possible stacks:
6 x 8 x 6 x 5 x 4 = 5760

Meet fs2

Fs2 is magic of it's own

Powerful, pure and functional

But don't be afraid

It's like collection but deferred and effectful

                      val wordCountS: Stream[IO, Int] =
                        Stream.emits(urls)
                            .evalMap(client.fetch)
                            .flatMap(_.body)
                            .through(fs2.text.utf8Decode)
                            .flatMap(body => Stream.emits(body.split(" "))
                            .mapFold(_ => 1)
                      // nothing got executed

                      val wordCountIO: IO[Int] = wordCountS.compile.lastOrError
                      // nothing got executed still

                      wordcountIO.runSyncUnsafe()
                      // now it got executed
                    
And effect-polymorphic!

And bracketing

Because resource-safety!

                      fs2.Stream[F, InputStream]
                      fs2.Stream[F, ConnectionPool]
                      fs2.Stream[F, ThreadPool]
                    


                      val resource: Resource[F, InputStream] =
                        connectionPoolStream
                          .compile
                          .resource
                          .lastOrError
                    

Integration 3

Doobie's got your back


                        trait StreamingNotesRepository[F[_]] {
                          def getAllStream(): fs2.Stream[F, Note]
                        }
                    

                        import ctx._
                        def getAllStream(): fs2.Stream[F, Note] = {
                          val q = quote { query[Note] }
                          // run(q).transact(xa)
                          stream(q).transact(xa)
                        }
                    

But what about http layer?

Don't worry son...

Integration 4

Http4s' got your back too


                      override def getNotes: HttpRoutes[F] = HttpRoutes.of[F] {
                        case GET -> Root / "notes" =>
                          val notes: fs2.Stream[F, Note] = repository.getAll()
                          Ok(notes)
                      }
                    

Now slow down

And write a client

Console
client App

Options

  • decline
  • scopt
  • case-app
  • scallop
Possible stacks:
6 x 8 x 6 x 5 x 4 x 4= 23040

Meet decline

Decline

Functional
command line
parser


                        magic get-notes --url localhost --port 8080
                        magic add-note --title my --text note
                        magic --help
                    

How to use it?

Step 0: Define commands datatypes (optional)

                        sealed trait MyCommand

                        object MyCommand {
                          case class GetNotes(url: String, port: Int)            extends MyCommand
                          case class AddNote(url: String, port: Int, note: Note) extends MyCommand
                        }
                    

How to use it?

Step 1: Define an Opt

                        val urlOpt: Opts[String] = Opts
                          .option[String]("url", help = "Server url")
                          .orElse(Opts.env[String]("MAGIC_URL", "Server url"))
                          .withDefault("localhost")
                    

How to use it?

Step 2: Join Opts together

                        val urlOpt: Opts[String] = ???
                        val portOpt: Opts[Int] = ???
                        val noteOpt: Opts[Note] = {
                          val titleOpt: Opts[String] = ???
                          val textOpt: Opts[String] = ???
                          (titleOpt, textOpt).mapN(Note)
                        }
                        val addNoteCmdOpt: Opts[MyCommand] = Opts
                            .subcommand[MyCommand]("add-note", "Add note")(
                                (urlOpt, portOpt, noteOpt).mapN(Command.AddNote)
                            )
                        val getNotesCmdOpt: Opts[MyCommand] = ???

                        getNotesCmdOpt orElse addNoteCmdOpt
                    
Composed via cats.Semigroupal!

How to use it?

Step 3: Use as entrypoint

                                  val command = Command(
                                        name = "magic",
                                        header = "Notes manager"
                                    )(Command.opts)
                                


                                  def main(args: List[String]): Unit = {
                                   command.parse(args, sys.env) match {
                                      case Left(help)  =>
                                        System.err.println(help)
                                      case Right(args) =>
                                        doSth(args)
                                    }
                                  }
                                

                                  object Main extends CommandApp(
                                    name   = "magic",
                                    header = "Notes manager",
                                    main = Command.opts.map(doSth),
                                  )
                                

                                    object Main extends CommandIOApp(
                                        name   = "magic",
                                        header = "Notes manager",
                                    ) {
                                        override def main:
                                                Opts[IO[ExitCode]] =
                                            Command.opts.map(doSth))
                                    }
                                
Cats-effect compatible!

Decline is helpful


                        Usage:
                            magic get-notes
                            magic add-note

                        Notes manager

                        Options and flags:
                            --help
                                Display this help text.

                        Subcommands:
                            get-notes
                                Get notes
                            add-note
                                Add note
                    

Interface is prepared

Time to consume the API

Http client

Options

  • http4s
  • play
  • sttp
  • akka-http
  • scalaj-http
Possible stacks:
6 x 8 x 6 x 5 x 4 x 4 x 5 = 111520

Meet sttp

Api strikes back


                      trait ApiClient[F[_]] {
                        def addNote(note: Note): F[Unit]
                        def getNotes(): F[List[Note]]
                      }
                    

Sttp is generic


                        trait SttpBackend[R[_], -S] {
                          def send[T](request: Request[T, S]): R[Response[T]]
                        }
                    

Sttp is pleasant to use


                      override def addNote(note: Note): F[Unit] = {
                        import com.softwaremill.sttp.circe._
                        sttp
                          .post(uri"$url/notes")
                          .body(note)
                          .send()
                          .map(_.body.leftMap(new RuntimeException(_)))
                          .rethrow
                          .as(())
                      }
                    
Integrates with circe!

Integration 5 & 6

Sttp + cats-effect + fs2 =


                        implicit val backend: SttpBackend[IO, fs2.Stream[IO, ByteBuffer]] =
                            AsyncHttpClientFs2Backend[IO]()
                    
Effect-polymorphic!

How about streaming?

Yet again

Integration 7

Sttp + fs2 + circe =


                        override def getNotesStream(): fs2.Stream[F, Note] = {
                            val response: F[fs2.Stream[F, ByteBuffer]] = sttp
                              .get(uri"$url/notes")
                              .response(asStream[fs2.Stream[F, ByteBuffer]])
                              .send()
                            ...
                          }
                    

                        override def getNotesStream(): fs2.Stream[F, Note] = {
                            ...
                            fs2.Stream
                              .eval(response)
                              .flatten
                              .flatMap(b => fs2.Stream.emits(b.array()))
                              .through(io.circe.fs2.byteStreamParser[F])
                              .map(_.as[Note])
                              .rethrow
                          }
                    

You don't like FP?

No worries, you can still use sttp

                        implicit val backend: SttpBackend[Id, Nothing] =
                            HttpURLConnectionBackend()

                        val response: Response[String] = request.send()
                    

No F[_]!

What else?

111520 different stacks and we've not even finished

Single API definition:
Endpoints/tapir

Share your endpoints (url, method, body format) between client, server + generate docs
Works with http4s, sttp and more

Text parsing: atto


                        val ip: Parser[IP] =
                          for {
                            a <- ubyte
                            _ <- char('.')
                            b <- ubyte
                            _ <- char('.')
                            c <- ubyte
                            _ <- char('.')
                            d <- ubyte
                          } yield IP(a, b, c, d)
                    
Parser is a (cats) monad!

Text parsing: atto-fs2


                        myCharsStream.through(atto.fs2.Pipes.parseN(myParser))
                    
Glory to incremental parsing!

Binary codecs: scodec


                            case class Point(x: Int, y: Int, z: Int)

                            val pointCodec = (int8 :: int8 :: int8).as[Point]

                            pointCodec.encode(Point(-5, 10, 1))
                            // Successful(BitVector(24 bits, 0xfb0a01))

                            pointCodec.decode(hex"0xfb0a01".bits)
                            // Successful(DecodeResult(Point(-5, 10, 1), BitVector(empty)))
                    
Works with fs2 and more!

Kafka: fs2-kafka


                            val stream =
                              consumerStream[IO]
                                .using(consumerSettings)
                                .evalTap(_.subscribeTo("topic"))
                                .flatMap(_.stream)
                                .mapAsync(25) { committable =>
                                  processRecord(committable.record)
                                    .map { case (key, value) =>
                                      val record = ProducerRecord("topic", key, value)
                                      ProducerRecords.one(record, committable.offset)
                                    }
                                }
                                .through(produce(producerSettings))
                                .map(_.passthrough)
                                .through(commitBatchWithin(500, 15.seconds))
                    
Built on top of fs2 and cats-effect

gRPC: fs2-grpc


                        val helloService: ServerServiceDefinition = MyFs2Grpc.bindService(new MyImpl())
                        ServerBuilder
                          .forPort(9999)
                          .addService(helloService)
                          .addService(ProtoReflectionService.newInstance())
                          .stream[IO] // or for any F: Sync
                          .evalMap(server => IO(server.start())) // start server
                          .evalMap(_ => IO.never) // server now running
                    
Built on top of fs2 and cats-effect

Caching: scalacache


                        import scalacache.modes.try_._
                        implicit val catsCache: Cache[Cat] = MemcachedCache("localhost:11211")

                        def getCat(id: Int): Try[Cat] = memoize(Some(10.seconds)) {
                          // Retrieve data from a remote API here ...
                          Cat(id, s"cat ${id}", "black")
                        }
                    
Works with cats-effect, Future, Try, and Id!

More typesafety: refined


                    val x: Int Refined Positive = -1 // compilation error
                

Integrates with:
  • doobie
  • circe
  • cats
  • decline

  • scodec
  • atto

and MUCH more

Enums: enumeratum


                                sealed trait Greeting extends EnumEntry
                                object Greeting extends Enum[Greeting] {
                                  case object Hello   extends Greeting
                                  case object GoodBye extends Greeting
                                  case object Hi      extends Greeting
                                  case object Bye     extends Greeting

                                  val values = findValues
                                }
                            
Integrates with:
  • doobie
  • quill
  • circe

  • decline
  • scodec
  • atto

and MUCH more

Purely impressive

And there is even more!

  • monocle
  • finnagle
  • cats-retry
  • console4cats
  • log4cats
  • fuuid
and MORE

How is this possible?

How is this possible?

Pillar one: typeclasses

Because
INTEGRATION of libs A & B in lib C

How is this possible?

Pillar two: cats

Because
COMPOSITION

How is this possible?

Pillar three: cats-effects

Because
SIDE EFFECTS

How is this possible?

Pillar four: fs2

Because
(side-effectful)
STREAMS

How is this possible?

Pillar zero: People

Their time and effort

Why is this important?

Why is this important?

Reuse of code

Why is this important?

Reuse of knowledge

Reuse of knowledge


Composition & transformation:
  • http4s HttpRoutes & EntityDecoder
  • circe Encoder & Decoder
  • doobie Read & Write
  • ciris ConfigValue & ConfigDecoder
  • decline Opts & Argument
  • atto Parser

Why is this important?

Similar programming model

Similar programming model


  • No side effects
  • Effect-polymorphic
  • Lawful composition
  • Safe resource acquisition

Other ecosystems

Akka | ZIO | Li Hayoi

Thanks!

Wojtek Pituła - @Krever01

w.pitula.me/presentations
You can leave feedback there ^