We’ve been using Apache Storm for 6 months or so. We are at the point where we’ve proved it out, built a bunch of operational and release management practices around it. It is starting to metastasize around our organization and become something of a gravity well in terms of heavy processing pipelines.
Then I went on vacation and all hell started breaking loose.
Most of our topologies are Kafka initiated. I started seeing in one major job, the queue “get stuck”. It would do some work, and then nothing. In this particular topology we are basically copying data from one data source into another, we have a fairly large message timeout, as some of these can take quite awhile. What we were seeing was after we would restart the worker, processing would start up again, we might pop a few messages of the queue, but then after a bit we’d be “stuck” again. No processing, but you can see in the Kafka offsets, that there are messages waiting that we aren’t picking up.
There was another problem, in one particular piece of data we were extracting the counts were coming out wrong, we were missing rows when we copied. How could this happen this should have triggered a failure in our topology? But it didn’t!
Well I spent a bunch of time chasing problem that I already solved, like configurations in the Spout or various params in the topology. My spidey sense told me it has something to do with an async callback. Because we are using Scala as our language of choice in Storm, and Storm sorta predates Futures, something about this made me uncomfortable. So I kept digging…
Let me explain the callback in a little more detail…
We are essentially writing rows into Cassandra using the async approach. I have a wrapper that turns the Cassandra java driver into Scala Futures that I borrowed from here: https://github.com/eigengo/activator-akka-cassandra/blob/master/src/main/scala/core/cassandra.scala. If you are using Cassandra in Scala and relying on the Java driver natively, I highly recommend this approach, you simply import that, and your Cassandra java futures turn into Scala futures automagically, makes things much more idiomatically Scala and happy.
So from that, when all our inserts complete we end up in an onComplete block like this:
Here lies the problem. It turns out that the Storm OutputCollector is not thread safe as of 0.8. You can read about that in their forums, I found it from this article from Spotify.
This was the cause of both of my problems. Acks were not always getting back because of this, so the topology never finished, hence getting “stuck”. I could see in the UI this was the case. Also, this is why the failures that were causing me to loose data never materialized.
So what’s the solution ? Don’t use async? Nah, that would suck…
I followed Spotify’s approach of storing the completions in a LinkedBlockingQueue. Then in my execute method I flush them. In this way, more data (or ticks) coming into your topology will trigger the flushs once you are back on the right thread and everything works. I just wrote a quick case class to store the data, and then a flush, like this:
Now everything is awesome. Hopes this help someone else. I was about ready to throw in the towel on Storm, or update my resume and move on… about the same time I break through typically 🙂