Cookbook

This section contains a cook book of useful topics.

Retries

Retries can be used when your initial request fails to complete and you wish to automatically retry the request. dispatch has direct support for retries independent of the underlying technology. async-http-client also has builtin retry capabilities (.setMaxRequestRetry) and you can adjust which exceptions trigger a retry although this aspect of configuration applies to all http clients. async-http-client can also implement a per-request retry strategy by using the onFailure method in the handler interface. dispatch's retry mechanism can be specified on a per request basis regardless of the client.

A good blog on the dispatch retry approach is here. Contrary to the blog, it's not true that your future has to return an Option or Either, it could return anything that returns a boolean where false indicates an error and true that it returned successfully. There are some implicit definitions that make Option and Either convenient though which is why the author mentions returning an Option or Either in the blog.

The essential idea is that since a Future cannot be restarted once finished so you need to provide the retry framework a function that returns a Future (think "call by name") that creates the "same" request. Once you have a function that generates a new Future, the retry framework can call this function to generate a new request if the previous request fails.

Retries can be based on directly trying a retry if there is a failure, retrying up to a specified number of times, retrying with a specified pause between retries and exponential backup retrying pauses.

Since the dispatch retry framework really just uses scala Future's you can use the framework anywhere you use a future. Of course, you could also use Futures recover and recoverWith to handle retry logic.

Here's a quick example of how to implement retry using dispatch's mechanism. You'll soon realize, however, that this forces you to alter your calling pattern a bit in order to use the API.

First, we can show that the dispatch retry mechanism is independent of http calls using amm. In our first example, we have a function that fails every time by returning None (remember dispatch expects the future to return Either or None although you can get around this). We will do 5 retries and each retry should call doit and hence, print out doit! so that we know the call occurred:

import retry.Succcess._
@ def doit() = { println("doit!"); Future(None) } 
defined function doit
@ doit() 
doit!
res15: Future[None.type] = Success(None)
@ retry.Directly(5)(doit _) 
doit!
doit!
doit!
doit!
doit!
doit!
res16: Future[None.type] = Success(None)

Now we will rewrite doit to always succeeed:

@ def doit() = { println("doit!"); Future(Some(30)) } 
defined function doit
@ retry.Directly(5)(doit _) 
doit!
res18: Future[Some[Int]] = Success(Some(30))

which only needs to call doit once and we see that in the output.

Now we will re-use our example-server of delaying a response. See the other section on the changes needed to example-server. The changes allow us to delay a response based on the URL. Fore example, running:

curl http://localhost:9000/delay/30000

causes the response be delayed by 30 seconds. If we set the timeout in async-http-client to 5 seconds and call the delay URL, we should get a timeout and we can then force a retry.

We need to setup the function to call. Note that the actual http call is cast to an either then projected to the right since our assumption is that the call should succeed. Then, if it were to succeed, we convert the returned value to a long just for fun. Of course, we are going to force every http call to fail to demonstrate dispatch's retry capability. Note also that we imported retry.Success._ so that the retry mechanism knows how to convert our return value to a "signal" that a retry should occur. You could pass in your own "predicate"

load.ivy("org.scala-lang.modules" %% "scala-async" % "latest.release")
load.ivy("net.databinder.dispatch" %% "dispatch-core" % "0.11.3")

import scala.async.Async._
import dispatch._, Defaults._
import scala.concurrent.Await
import scala.concurrent.duration._

import retry.Success._

val myhttp = Http.configure(_.setRequestTimeout(2000))

def callit(n: Long) = {
  println("callit!")
  myhttp(url(s"http://localhost:9000/delay/$n") OK as.String).either.right.map(x => x.toLong)
}

Then call the dispatch retry mechanism:

retry.Directly(5)(() => callit(30000))

Our output should be the initial "callit!" output then 5 more println as the retry occurs. Because we have set the request time to 2 seconds in the async-http-client layer but we request a 30 second delay, every call will fail. Here is the output:

@ retry.Directly(5)(() => callit(30000)) 
callit!
res78: Future[Either[Throwable, Long]] = List()
@ callit!
callit!
callit!
callit!
callit!
callit!
@  
@ res78.isCompleted 
res79: Boolean = true
@ res78 onComplete println 
Success(Left(java.util.concurrent.TimeoutException: Request timed out to localhost/127.0.0.1:9000 of 2000 ms))

The returned value or error will be that which is generated on the last "retry." The other retry calls work similarly e.g. retry.Backoff and retry.Pause.

Dispatch contains the retry mechanism is a package `retry` but you can use your own. You will find that the retry capabilities of Dispatch are a slimmer version of what you can find in the following two projects that include retry functions that can also contain jitter.

The API is very similar (!) so its mostly a drop-in replacement.

Response Body Access When Errors Occur

When your call results in an error, say you are expecting 200 but get a 400 (bad request), you want to be able to inspect the body of the returned response in addition to throwing an exception. Throwing an exception is handled in the Future that is returned when you dispatch a request, but if an exception occurs, you might wonder how you get the response body to inspect and understand any additional error information that may have been returned by the server?

This link, http://stackoverflow.com/questions/25233422/how-to-get-response-headers-and-body-in-dispatch-request, gives you an approach and also reminds you that you can write your own customized "ok" handler specific to your application needs.

First you need to recognize that dispatch is a thin layer on top of http-async-client. The underlying library uses an AsyncHandler to handle responses. Dispatch includes an OK default handler that conforms to the AsyncHandler interface:

/**
 * Builds tuples of (Request, AsyncHandler) for passing to Http#apply.
 * Implied in dispatch package object
 */
class RequestHandlerTupleBuilder(req: Req) {
  def OK [T](f: Response => T) =
    (req.toRequest, new OkFunctionHandler(f))
  def > [T](f: Response => T) =
    (req.toRequest, new FunctionHandler(f))
  def > [T](h: AsyncHandler[T]) =
    (req.toRequest, h)
}

case class StatusCode(code: Int)
extends Exception("Unexpected response status: %d".format(code))

class FunctionHandler[T](f: Response => T) extends AsyncCompletionHandler[T] {
  def onCompleted(response: Response) = f(response)
}

class OkFunctionHandler[T](f: Response => T)
extends FunctionHandler[T](f) with OkHandler[T]

trait OkHandler[T] extends AsyncHandler[T] {
  abstract override def onStatusReceived(status: HttpResponseStatus) = {
    if (status.getStatusCode / 100 == 2)
      super.onStatusReceived(status)
    else
      throw StatusCode(status.getStatusCode)
  }

If you recall, the dispatch client needs a (request, handler) tuple. The first class provides the OK definition. The handler in this case is the OkFunctionHandler. It takes a function to apply to the response, for example as.String or as.xml.Elem (which are functions that transform a response to the desired output type.

The OkHandler trait adheres to the AsyncHandler interface and defines a onStatusReceived method. Not shown, there is also a onCompleted method as well. We can use the same code structure with the onCompleted method to check the status code and if it is not 200, throw an exception of our choosing with the response body as a string. We could obviously use a higher order function that throws an exception type of our choice or do something else entirely, such as print out the response body to a log file. Here's the code, slightly changed from the blog:

/**
   * Allows you to use `Http(req OkWithBody as.xml.Elem)` to obtain
   * the result as an Elem if successful or a ApiHttpError, if an error
   * is thrown.
   */
  implicit class MyRequestHandlerTupleBuilder(req: Req) {
    def OKWithBody[T](f: Response => T) =
      (req.toRequest, new OkWithBodyHandler(f))
  }

}

import com.ning.http.client._

class OkWithBodyHandler[T](f: Response => T) extends AsyncCompletionHandler[T] {
  def onCompleted(response: Response) = {
    if (response.getStatusCode / 100 == 2) {
      f(response)
    } else {
      throw ApiHttpError(response.getStatusCode, response.getResponseBody)
    }
  }
}

/**
 * Exception that captures the response code and the response body as a string.
 * Having the response body may allow you to diagnose the bad response
 * faster and easier.
 */
case class ApiHttpError(code: Int, body: String)
  extends Exception("Unexpected response status: %d".format(code))

To use this we simply:

Http(myRequest OKWithBody as.xml.Elem)

If we get an exception, we can get access to the response body. But let's give this a try to make sure. First lets modify our example server so it returns a status code of 400 when we want it to:

~ path("400") { 
        get { 
          complete(StatusCodes.BadRequest, "Bad request!")
        }
      }

which if we run a request to this URL gives us the error we expect:

$ curl http://localhost:9000/400
Bad request!

So now we can try this with our program:

@ val x = Http(url("http://localhost:9000/400") OKWithBody as.String)
x: dispatch.Future[String] = List()

@ x
res4: dispatch.Future[String] = Failure(java.util.concurrent.ExecutionException: ApiHttpError: Unexpected response status: 400)

Which is not what we expected. The inner exception is our ApiHttpError but its wrapped in an ExceutionException exception. This wrapping would make it very hard to pattern match on in a recover statement or otherwise in our program. In fact, this would not work:

dispatchFuture recover { 
 case ApiHttpError(msg, body) => println("400: error body: " + body)
 case NonFatal(e) => println("Error in app")
}

Because the actual exception value is wrapped, you would never match on the case ApiHttpError portion. But we could use dispatch's either:

@ val x = Http(url(...) OkWithBody as.String).either

@ x.map(_.left.get.asInstanceOf[crm.ApiHttpError].body)
scala.concurrent.Future[String] = Success(Bad request!)

The string "Bad Request!" is the body of the 400 response from our example server. That last line was very ugly but I did not want to write any pattern matchers to extract out the value to demonstrate my point.

You may wonder how the .either can pick out the inner exception. That's straight forward from the disptach code and in fact mirrors what we would have to do:

underlying.map { res => Right(res) }.recover {
      case exc: ExecutionException => Left(exc.getCause)
      case throwable => Left(throwable)
    }

If the Future from Http succeeds its wrapped in the Right and if its an exception recover is called to extract out the inner exception and wrap it in a Left. It is not dispatch's fault that its so goofy coming from the java layer which is the layer where the exception is caught when it is thrown in the OkWithBody handler. And since its in the java layer, the direct type information is really lost.

Perhaps the best way to handle these types of problems is to really avoid them altogether and always ask dispatch to give you back the raw response object from async-http-client and process the response object directly using your own framework, for example, similar to way that play-json handles parsing json conten.

In reality, whether you define your own result object (with a good result and a bad result variant) or use Either (which is already available to you), its much the same thing e.g. you are using a co-product data structure to contain the error or the result value and in fact you could pattern match on the error if you wanted to since you know that the Future must have a valid value (an Either) so you could call dispatch's Future enhancement () on the Future and obtain the value to get:

x() match { 
  case Right(result) => println("Result is "  + result)
  case Left(x: ApiHttpError) => println("400 error: Response body: " + x.body)
  case Left(e) => println("Unknown error: " + e)
}

But beware, the () (apply operator in scala) for enhanced futures in Dispatch calls Await which is generally considered to be bad form in a reactive application.

If you just want to unwrap an ExecutionException so you can pattern match using standard scala Future methods, then you could write an unwrap implicit class (scala 2.11+) that unwraps only that exception:

implicit class EnhancedFuture[A](fut: scala.concurrent.Future[A]) { 
    /**
     * Unwrap an exception in the Future if its an ExecutionException,
     * otherwise leave the exception alone.
     */
    def unwrapEx: Future[A] =
      fut.recoverWith { 
        case x: java.util.concurrent.ExecutionException => Future.failed(x.getCause)
        case x@_ => Future.failed(x)
      }
  }

then use it:

Http(req OkWithBody as.String).unwrapEx.map{ .... }.recover {
 case ApiHttpError(status, body) => println("Error response body is: " + body)
 case NonFatal(e) => println("Some other error occurred....")

}

You can call unwrapEx anywhwere in the chain but make sure it occurs before the recover part of the statement.

By hiding the parsing down in the handler, you can make it easier to parse results at the next level up, with the loss of composability somewhat. Let's say that we are working with SOAP calls and we always get an XML response. And let's use the xtract library for extracting values from XML objects. We know we can have only a limited number of errors, aside from catastrophic errors:

import com.ning.http.client.Response
  import com.lucidchart.open.xtract._
  import com.lucidchart.open.xtract.{ XmlReader, __ }

  sealed abstract class ResponseError(raw: Response) {
    def body: String = raw.getResponseBody
    def logMsg = {
      import scala.collection.JavaConverters._
      val headers = raw.getHeaders().keySet().asScala.map(k => s"'$k' -> '${raw.getHeader(k)}'").mkString("\n")
      s"""Error during processing.
Headers:
$headers    
ResponseBody:
${raw.getResponseBody}
"""
    }

    def log(logger: Logger): Unit
  }
  case class UnexpectedStatus(raw: Response, code: Int, msg: Option[String] = None) extends ResponseError(raw) {
    def log(logger: Logger) = {
      logger.error("Unexpected status: $code")
      msg.foreach(logger.error(_))
      logger.error(logMsg)
    }
  }
  case class UnknonwnResponseError(raw: Response, msg: String, ex: Option[Throwable] = None) extends ResponseError(raw) {
    def log(logger: Logger) = {
      logger.error(s"Unknown error: $msg")
      ex.foreach(logger.error(_)("Exception thrown."))
      logger.error(logMsg)
    }
  }
  case class XmlParseError[T](raw: Response, parseError: ParseResult[T], msg: Option[String] = None) extends ResponseError(raw) {
    def log(logger: Logger) = {
      logger.error(s"Parse error: $parseError")
      logger.error(logMsg)
    }
  }

  def toUserMessage(error: ResponseError): String = error match {
    case UnexpectedStatus(r, c, msg) => s"Unexpected response from server."
    case UnknonwnResponseError(r, m, ex) => s"Unexpected error occurred."
    case XmlParseError(r, e, m) => s"Error parsing response from server."
  }

We should have used a typeclass for the logging but let's be lazy for the moment. Now, like before, we need an AsyncHandler to receive the response, check the status code, parse the body then return th appropriate object. Let's use cats`Xor class to handle the disjunction:

/**
   * Parse the body as XML using the implicit XmlReader. You could map into the result
   * of course. Parsing occurs on the response processing thread. If you want to control
   * the execution context, map into the future that contains the raw response.
   */
  class OkThenParse[T](implicit reader: XmlReader[T]) extends AsyncCompletionHandler[ResponseError Xor T] {
    def onCompleted(response: Response) = {
      if (response.getStatusCode / 100 == 2) {
        try {
          val body = scala.xml.XML.loadString(response.getResponseBody)
          reader.read(body) match {
            case ParseSuccess(a) => Xor.right(a)
            case PartialParseSuccess(a, errs) => Xor.right(a)
            case x@ParseFailure(errs) => Xor.left(XmlParseError(response, x, None))
          }
        } catch {
          case util.control.NonFatal(e) => Xor.left(UnknonwnResponseError(response, "Unknown error occurred", ex = Some(e)))
        }
      } else {
        Xor.left(UnexpectedStatus(response, response.getStatusCode, None))
      }
    }
  }

  implicit class MyParsingRequestHandlerTupleBuilder(req: Req) {
    def OKThenParse[T](implicit reader: XmlReader[T]) =
      (req.toRequest, new OkThenParse())
  }

Now instead of worrying about ApiHttpError we just have values representing errors, whwich is more functional. In our calling program, let's assume we have a reader:

objec MyDomainObject {
  implicit val myReader: MyDomainObject = ((__  \ "name").read[String] and ...)(MyDomainObject)
 ...
}

We can use our handler in our dispatch call and handle how error messages are reported to the user (if that's important for your application):

val fut2 = Http(req OKThenParse (MyDomainObject.myReader).
          andThen {
            case Success(Xor.Right(value)) => println(s"Result: $value")
            case Success(Xor.Left(error)) =>
              println(s"Error during processing: ${toUserMessage(error)}")
              error.log(logger)
            case Failure(ex) =>
              // Wi could still have a failure exception because async-http-clent is not functional underneath...
              println("Error processing query. See log.")
              logger.error(ex)("Error processing query.")

          }

Or something like that...it may be easier to setup a set of streams and stream the left or right side through different processors to handle the values...it's up to you.

Last updated