Service Composition

Service composition helps you create results by combining dependent information from underlying services. You can compose well or poorly, depending on how you manage the underlying services and calls to those services.

A few libraries have been created to help you compose calls to services and combine the results. Generally, the more recent service composition libraries are based on simple ideas such as managing the creation of the request separate from executing the requests. Of course, this is the basic idea behind the IO monad found in haskell and a variety of other languages that allow you to control evaluation. We are also concerned about how to catch and manage error information once errors do occur.

First we can address service composition using fetch. It is interesting to note that the scala async macro helps you compose Future's in simple scenarios that match what fetch does when you use Future's with fetch.

fetch

Fetch composes service calls and combines the results together. Here's the main documentation site. It handles batching, request combining and caching. While there may be better ways to handle caching, it allows you to address caching at the fetch library level versus an infrastructure level.

First, we need to enhance our example-server to serve up some data. You can easily create a contrived customer micro-service with akka-http:

~ pathPrefix("customers") {
        path(IntNumber) { n =>
          get {
            onSuccess(customers.find(n)) {
              case Some(customer) => complete(customer)
              case None => complete(StatusCodes.NotFound)
            }
          }
        } ~ path("posts-from" / IntNumber) { n =>
          get {
            onSuccess(msgs.getMsgsFrom(n)) { m =>
              complete(m)
            }
          }
        } ~ path("posts-to" / IntNumber) { n =>
          get {
            onSuccess(msgs.getMsgsTo(n)) { m =>
              complete(m)
            }
          }
        }
      }
    }

which is powered by some new modules:

package example

import scala.annotation.implicitNotFound
import scala.annotation.migration
import scala.concurrent.Future
import scala.language._

import spray.json.DefaultJsonProtocol

case class Customer(id: Int, name: String, address: String)

// I use sleep which probably does not do what I want
// exactly given how actors work but is good enough for this example.
trait CustomerRespository extends DefaultJsonProtocol {

  private val r = scala.util.Random

  private val customers = Map(
    1 -> Customer(1, "John", "123 Play St."),
    2 -> Customer(2, "Mary", "495 Norm St."),
    3 -> Customer(3, "Beth", "391 Happy St."))

  /** Get a customer, pretend it has latency by using a Future ... */
  def find(id: Int)(implicit ec: concurrent.ExecutionContext) = Future {
    Thread.sleep(r.nextInt(4000))
    customers.get(id)
  }

  /** Return all the IDs known in the customer database ... */
  def ids(implicit ec: concurrent.ExecutionContext) = Future {
    Thread.sleep(r.nextInt(2000))
    customers.keys.toList
  }

  implicit val customerFormat = jsonFormat3(Customer)
  implicit val customerListFormt = immSeqFormat[Customer]
}

case class Msg(from: Int, to: Int, content: String)

trait MsgRepository extends DefaultJsonProtocol {

  private val r = scala.util.Random

  private val msgs = collection.immutable.Seq(
    Msg(1, 3, "yt?"),
    Msg(1, 1, "hi me!"),
    Msg(2, 1, "can't talk now"),
    Msg(2, 3, "how's it going?"),
    Msg(2, 3, "let's do it!"))

  /** Return a list of posts mode by a customer. */
  def getMsgsFrom(id: Int)(implicit ec: concurrent.ExecutionContext) =
    Future {
      Thread.sleep(r.nextInt(3000))
      msgs.filter(_.from == id)
    }

  /** Return a list of posts from a customer. */
  def getMsgsTo(id: Int)(implicit ec: concurrent.ExecutionContext) = Future {
      Thread.sleep(r.nextInt(3000))
      msgs.filter(_.to == id)
    }

  implicit val msgsFormat = jsonFormat3(Msg)
  implicit val msgListFormat = immSeqFormat[Msg]
}

This allows us to call the service, say, using curl:

$ curl http://localhost:9000/customers/1
{
  "id": 1,
  "name": "John",
  "address": "123 Play St."
}
$ curl http://localhost:9000/customers/posts-from/1
[{
  "from": 1,
  "to": 3,
  "content": "yt?"
}, {
  "from": 1,
  "to": 1,
  "content": "hi me!"
}]
$ curl http://localhost:9000/customers/posts-to/1
[{
  "from": 1,
  "to": 1,
  "content": "hi me!"
}, {
  "from": 2,
  "to": 1,
  "content": "can't talk now"
  }]

and we add the modules to the main program file. Notice the use of spray json, which makes it easy to convert a string to a JSON object to a scala JVM object.

import spray.json._

object customers extends CustomerRespository
import customers._
object msgs extends MsgRepository
import msgs._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

The last import brings in implicit converts that take the json reader/writers/formats and changes them into marshallers and unmarshallers suitable for use in akka routes. akka has a robust marshaller and unmarshaller framework.

Now we can use fetch to fetch results and combine them together. Since dispatch returns a scala Future, we need to use the Future monad composition in fetch. fetch's user documentation is here.

Since dispatch returns scala Futures, we need to hook this into fetch's asynchronous support:

object Example {
  type EntityId = Int

  case class Customer(id: EntityId, name: String, address: String)

  trait CustomerJsonProtocol extends DefaultJsonProtocol {
    implicit val customerFormat = jsonFormat3(Customer)
    implicit val customerListFormt = immSeqFormat[Customer]
  }

  case class Msg(from: EntityId, to: Int, content: String)

  trait MsgJsonProtocol extends DefaultJsonProtocol {
    implicit val msgsFormat = jsonFormat3(Msg)
    implicit val msgListFormat = immSeqFormat[Msg]
  }

  implicit object CustomerSource extends DataSource[EntityId, Customer] with CustomerJsonProtocol {
    override def fetchOne(id: EntityId): Query[Option[Customer]] = {
      Query.async((ok, err) => {
        val fut = Http(url(s"http://localhost:9000/customers/$id") OK as.String).
          map(body => spray.json.JsonParser(body).convertTo[Option[Customer]])
        import scala.util._
        fut.onComplete {
          case Success(c) => ok(c)
          case Failure(ex) => err(ex)
        }
      })
    }

    override def fetchMany(ids: NonEmptyList[EntityId]): Query[Map[EntityId, Customer]] =
      batchingNotSupported(ids)
  }

  // Or, Fetch(id)(CustomerSource) w/o implicit resolution
  def fetchCustomer(id: EntityId): Fetch[Customer] = Fetch(id)
}

This allows us to get a customer object using the Fetch API. Personally, I think the fetch API will evolve so as to not be so heavy on objects and traits, but that's another day.

We can now create a fetch object, re-use as much as we want, and get a customer object.

  val fetchCustomer1 = fetchCustomer(1)
    val result1 = Fetch.run[Future](fetchCustomer)
    result1 onComplete println

    println("Hit ENTER to end program...")
    scala.io.StdIn.readLine

Which prints out:

Fectching customers
Hit ENTER to end program...
Success(Customer(1,John,123 Play St.))

The result prints after the prompt because we artificially inserted a small delay into the server response on the server side.

Now we can compose our customer fetches together and use a slightly different syntax to run the Fetch object (e.g. .runA[Future] vs Fetch.run[Future]):

    import cats.syntax.cartesian._
    val result2 = (fetchCustomer(1) |@| fetchCustomer(2)).tupled.runA[Future]
    result2 onComplete println

to get

Success((Customer(1,John,123 Play St.),Customer(1,John,123 Play St.)))

So now we can sequence our requests, which we could have done with scala async of course, by using:

  val userAndMsgs = for {
      u <- fetchCustomer(2)
      from <- fetchMsgsFrom(u.id)
    } yield (u, from)
    val userMsgsFut = userAndMsgs.runA[Future]
    userMsgsFut onComplete println

which gives us:

Success((Customer(2,Mary,495 Norm St.),List(Msg(2,1,can't talk now), Msg(2,3,how's it going?), Msg(2,3,let's do it!))))

To retrieve a list back from the http server, we had to ensure that our result signature in the DataSource reflected a list of objects:

implicit object MsgToSource extends DataSource[EntityId, Seq[Msg]] with MsgJsonProtocol {
    override def fetchOne(id: EntityId): Query[Option[Seq[Msg]]] = {
      Query.async((ok, err) => {
        val fut = Http(url(s"http://localhost:9000/customers/posts-from/$id") OK as.String).
          map(body => spray.json.JsonParser(body).convertTo[Option[Seq[Msg]]])
        import scala.util._
        fut.onComplete {
          case Success(m) => ok(m)
          case Failure(ex) => err(ex)
        }
      })
    }

    override def fetchMany(ids: NonEmptyList[EntityId]): Query[Map[EntityId, Seq[Msg]]] =
      batchingNotSupported(ids)
  }
  def fetchMsgsFrom(id: EntityId): Fetch[Seq[Msg]] = Fetch(id)
}

The key is to ensure that you can marshal the result from dispatch to the proper case class type.

Last updated