Show me the error or shutup

Options, Try, Futures, oh my!  You can go read all the various articles on how the semantics of these are similar.  Here’s one,  I highly recommend the cheat sheet as a reference, comes in handy.  But here is a real world examples to help as well.  I find myself very often needing to handle an error just to print out a log message.  So I started out with the normal try/catch as in Java.  It’s a bunch of brackets and kinda verbose.

Then once you switch to using futures, at some point you’ll end up forgetting to handle a failure, and your root cause will just be gone, you’ll waste a bunch of time, and then start putting onComplete, onFailure, or recover everywhere.  At least that’s what I did, maybe I’m an idiot, and you are smarter.

Well,  that’s just as verbose as the try/catch, even more so when you don’t want to handle one side of the equation.  What you can do, and this is the same for Try or a Future, is use “.failed.map”.  Like this:

Try (someStuffThatMightFail).failed.map(e => println(e.getMessage)).

This will invert the Try so that failed() returns you a Try[Throwable], you can then map over it to get your error.  getOrElse() and things like this don’t work because you want the error, and they won’t give it to you.  If you have no error, then the map(), won’t do anything, and you just go on about your business.

So much cleaner, here are some examples:


import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
def fail = throw new RuntimeException("error now")
def failInFuture() = Future { throw new RuntimeException("error in future") }
try {
fail
} catch {
case e: Exception => println(e.getMessage)
}
failInFuture().onComplete {
case Success(result) =>
case Failure(t) => println("error reported in onComplete")
}
failInFuture().onFailure {
case e => println("error reported in onFailure")
}
failInFuture().recover {
case e => println("error reported in recover")
}
Try (fail).failed.map(e => println(e.getMessage))
failInFuture().failed.map(e => println(e.getMessage))

view raw

error.scala

hosted with ❤ by GitHub


scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success, Try}
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global
scala>
scala> def fail = throw new RuntimeException("error now")
fail: Nothing
scala>
scala> def failInFuture() = Future { throw new RuntimeException("error in future") }
failInFuture: ()scala.concurrent.Future[Nothing]
scala>
scala> try {
| fail
| } catch {
| case e: Exception => println(e.getMessage)
| }
error now
scala>
scala> failInFuture().onComplete {
| case Success(result) =>
| case Failure(t) => println("error reported in onComplete")
| }
error reported in onComplete
scala>
scala> failInFuture().onFailure {
| case e => println("error reported in onFailure")
| }
error reported in onFailure
scala>
scala> failInFuture().recover {
| case e => println("error reported in recover")
| }
res24: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@bd1111a
error reported in recover
scala>
scala>
scala> Try (fail).failed.map(e => println(e.getMessage))
error now
res25: scala.util.Try[Unit] = Success(())
scala> failInFuture().failed.map(e => println(e.getMessage))
error in future
res26: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@65753040

view raw

error_run.sh

hosted with ❤ by GitHub

 

Async stormy weather

 

 

We’ve been using Apache Storm for 6 months or so.  We are at the point where we’ve proved it out, built a bunch of operational and release management practices around it.  It is starting to metastasize around our organization and become something of a gravity well in terms of heavy processing pipelines.

Then I went on vacation and all hell started breaking loose.

Most of our topologies are Kafka initiated.  I started seeing in one major job, the queue “get stuck”.  It would do some work, and then nothing.  In this particular topology we are basically copying data from one data source into another, we have a fairly large message timeout, as some of these can take quite awhile.  What we were seeing was after we would restart the worker, processing would start up again, we might pop a few messages of the queue, but then after a bit we’d be “stuck” again.  No processing, but you can see in the Kafka offsets, that there are messages waiting that we aren’t picking up.

There was another problem, in one particular piece of data we were extracting the counts were coming out wrong, we were missing rows when we copied.  How could this happen this should have triggered a failure in our topology?  But it didn’t!

Well I spent a bunch of time chasing problem that I already solved, like configurations in the Spout or various params in the topology.  My spidey sense told me it has something to do with an async callback.  Because we are using Scala as our language of choice in Storm, and Storm sorta predates Futures, something about this made me uncomfortable.  So I kept digging…

Let me explain the callback in a little more detail…

We are essentially writing rows into Cassandra using the async approach.  I have a wrapper that turns the Cassandra java driver into Scala Futures that I borrowed from here: https://github.com/eigengo/activator-akka-cassandra/blob/master/src/main/scala/core/cassandra.scala.  If you are using Cassandra in Scala and relying on the Java driver natively, I highly recommend this approach, you simply import that, and your Cassandra java futures turn into Scala futures automagically, makes things much more idiomatically Scala and happy.

So from that, when all our inserts complete we end up in an onComplete block like this:


insertRows(data).onComplete {
case Failure(e) =>
collector.fail(tuple)
case Success(rs) =>
collector.ack(tuple)
}
}

view raw

gistfile1.scala

hosted with ❤ by GitHub

Here lies the problem.  It turns out that the Storm OutputCollector is not thread safe as of 0.8.   You can read about that in their forums, I found it from this article from Spotify.

This was the cause of both of my problems. Acks were not always getting back because of this, so the topology never finished, hence getting “stuck”.  I could see in the UI this was the case.  Also, this is why the failures that were causing me to loose data never materialized.

So what’s the solution ?  Don’t use async?  Nah, that would suck…

I followed Spotify’s approach of storing the completions in a LinkedBlockingQueue.  Then in my execute method I flush them.  In this way, more data (or ticks) coming into your topology will trigger the flushs once you are back on the right thread and everything works.  I just wrote a quick case class to store the data, and then a flush, like this:


case class Completion(tuple:Tuple, exOpt: Option[Throwable] = None)
class SomeBolt extends BaseRichBolt {
var completions = new LinkedBlockingQueue[Completion]()
def flushCompletions = {
if (completions.size() > 0) {
val completionArr = new util.ArrayList[Completion]()
completions.drainTo(completionArr)
logger.debug("completing " + completionArr.size() + " tuples")
completionArr.foreach { completion =>
if (completion.exOpt.isEmpty) {
collector.ack(completion.tuple)
} else {
collector.reportError(completion.exOpt.get)
collector.fail(completion.tuple)
}
}
}
}
override def execute(tuple: Tuple): Unit = {
flushCompletions
}
}

view raw

asyncbolt.scala

hosted with ❤ by GitHub

Now everything is awesome.  Hopes this help someone else.  I was about ready to throw in the towel on Storm, or update my  resume and move on… about the same time I break through typically 🙂