Service composition helps you create results by combining dependent information from underlying services. You can compose well or poorly, depending on how you manage the underlying services and calls to those services.
A few libraries have been created to help you compose calls to services and combine the results. Generally, the more recent service composition libraries are based on simple ideas such as managing the creation of the request separate from executing the requests. Of course, this is the basic idea behind the IO monad found in haskell and a variety of other languages that allow you to control evaluation. We are also concerned about how to catch and manage error information once errors do occur.
First we can address service composition using fetch. It is interesting to note that the scala async macro helps you compose Future's in simple scenarios that match what fetch does when you use Future's with fetch.
fetch
Fetch composes service calls and combines the results together. the main documentation site. It handles batching, request combining and caching. While there may be better ways to handle caching, it allows you to address caching at the fetch library level versus an infrastructure level.
First, we need to enhance our example-server to serve up some data. You can easily create a contrived customer micro-service with akka-http:
~ pathPrefix("customers") {
path(IntNumber) { n =>
get {
onSuccess(customers.find(n)) {
case Some(customer) => complete(customer)
case None => complete(StatusCodes.NotFound)
}
}
} ~ path("posts-from" / IntNumber) { n =>
get {
onSuccess(msgs.getMsgsFrom(n)) { m =>
complete(m)
}
}
} ~ path("posts-to" / IntNumber) { n =>
get {
onSuccess(msgs.getMsgsTo(n)) { m =>
complete(m)
}
}
}
}
}
which is powered by some new modules:
package example
import scala.annotation.implicitNotFound
import scala.annotation.migration
import scala.concurrent.Future
import scala.language._
import spray.json.DefaultJsonProtocol
case class Customer(id: Int, name: String, address: String)
// I use sleep which probably does not do what I want
// exactly given how actors work but is good enough for this example.
trait CustomerRespository extends DefaultJsonProtocol {
private val r = scala.util.Random
private val customers = Map(
1 -> Customer(1, "John", "123 Play St."),
2 -> Customer(2, "Mary", "495 Norm St."),
3 -> Customer(3, "Beth", "391 Happy St."))
/** Get a customer, pretend it has latency by using a Future ... */
def find(id: Int)(implicit ec: concurrent.ExecutionContext) = Future {
Thread.sleep(r.nextInt(4000))
customers.get(id)
}
/** Return all the IDs known in the customer database ... */
def ids(implicit ec: concurrent.ExecutionContext) = Future {
Thread.sleep(r.nextInt(2000))
customers.keys.toList
}
implicit val customerFormat = jsonFormat3(Customer)
implicit val customerListFormt = immSeqFormat[Customer]
}
case class Msg(from: Int, to: Int, content: String)
trait MsgRepository extends DefaultJsonProtocol {
private val r = scala.util.Random
private val msgs = collection.immutable.Seq(
Msg(1, 3, "yt?"),
Msg(1, 1, "hi me!"),
Msg(2, 1, "can't talk now"),
Msg(2, 3, "how's it going?"),
Msg(2, 3, "let's do it!"))
/** Return a list of posts mode by a customer. */
def getMsgsFrom(id: Int)(implicit ec: concurrent.ExecutionContext) =
Future {
Thread.sleep(r.nextInt(3000))
msgs.filter(_.from == id)
}
/** Return a list of posts from a customer. */
def getMsgsTo(id: Int)(implicit ec: concurrent.ExecutionContext) = Future {
Thread.sleep(r.nextInt(3000))
msgs.filter(_.to == id)
}
implicit val msgsFormat = jsonFormat3(Msg)
implicit val msgListFormat = immSeqFormat[Msg]
}
This allows us to call the service, say, using curl:
and we add the modules to the main program file. Notice the use of spray json, which makes it easy to convert a string to a JSON object to a scala JVM object.
The last import brings in implicit converts that take the json reader/writers/formats and changes them into marshallers and unmarshallers suitable for use in akka routes. akka has a robust marshaller and unmarshaller framework.
Since dispatch returns scala Futures, we need to hook this into fetch's asynchronous support:
object Example {
type EntityId = Int
case class Customer(id: EntityId, name: String, address: String)
trait CustomerJsonProtocol extends DefaultJsonProtocol {
implicit val customerFormat = jsonFormat3(Customer)
implicit val customerListFormt = immSeqFormat[Customer]
}
case class Msg(from: EntityId, to: Int, content: String)
trait MsgJsonProtocol extends DefaultJsonProtocol {
implicit val msgsFormat = jsonFormat3(Msg)
implicit val msgListFormat = immSeqFormat[Msg]
}
implicit object CustomerSource extends DataSource[EntityId, Customer] with CustomerJsonProtocol {
override def fetchOne(id: EntityId): Query[Option[Customer]] = {
Query.async((ok, err) => {
val fut = Http(url(s"http://localhost:9000/customers/$id") OK as.String).
map(body => spray.json.JsonParser(body).convertTo[Option[Customer]])
import scala.util._
fut.onComplete {
case Success(c) => ok(c)
case Failure(ex) => err(ex)
}
})
}
override def fetchMany(ids: NonEmptyList[EntityId]): Query[Map[EntityId, Customer]] =
batchingNotSupported(ids)
}
// Or, Fetch(id)(CustomerSource) w/o implicit resolution
def fetchCustomer(id: EntityId): Fetch[Customer] = Fetch(id)
}
This allows us to get a customer object using the Fetch API. Personally, I think the fetch API will evolve so as to not be so heavy on objects and traits, but that's another day.
We can now create a fetch object, re-use as much as we want, and get a customer object.
val fetchCustomer1 = fetchCustomer(1)
val result1 = Fetch.run[Future](fetchCustomer)
result1 onComplete println
println("Hit ENTER to end program...")
scala.io.StdIn.readLine
Which prints out:
Fectching customers
Hit ENTER to end program...
Success(Customer(1,John,123 Play St.))
The result prints after the prompt because we artificially inserted a small delay into the server response on the server side.
Now we can compose our customer fetches together and use a slightly different syntax to run the Fetch object (e.g. .runA[Future] vs Fetch.run[Future]):
Success((Customer(1,John,123 Play St.),Customer(1,John,123 Play St.)))
So now we can sequence our requests, which we could have done with scala async of course, by using:
val userAndMsgs = for {
u <- fetchCustomer(2)
from <- fetchMsgsFrom(u.id)
} yield (u, from)
val userMsgsFut = userAndMsgs.runA[Future]
userMsgsFut onComplete println
which gives us:
Success((Customer(2,Mary,495 Norm St.),List(Msg(2,1,can't talk now), Msg(2,3,how's it going?), Msg(2,3,let's do it!))))
To retrieve a list back from the http server, we had to ensure that our result signature in the DataSource reflected a list of objects:
implicit object MsgToSource extends DataSource[EntityId, Seq[Msg]] with MsgJsonProtocol {
override def fetchOne(id: EntityId): Query[Option[Seq[Msg]]] = {
Query.async((ok, err) => {
val fut = Http(url(s"http://localhost:9000/customers/posts-from/$id") OK as.String).
map(body => spray.json.JsonParser(body).convertTo[Option[Seq[Msg]]])
import scala.util._
fut.onComplete {
case Success(m) => ok(m)
case Failure(ex) => err(ex)
}
})
}
override def fetchMany(ids: NonEmptyList[EntityId]): Query[Map[EntityId, Seq[Msg]]] =
batchingNotSupported(ids)
}
def fetchMsgsFrom(id: EntityId): Fetch[Seq[Msg]] = Fetch(id)
}
The key is to ensure that you can marshal the result from dispatch to the proper case class type.
Now we can use fetch to fetch results and combine them together. Since dispatch returns a scala Future, we need to use the Future monad composition in fetch. fetch's user documentation is .