Scala scripting ?!

So I have a lot of little data fixing up scripts I have to write to maintain our large and growing elasticsearch database.  Typically, I use shell stack (bash/jq/curl) to do such tasks.  When things get a little bigger I switch to python.  I’m a big fan of jq/curl but for anything that isn’t small, it gets nasty.  I’m a big fan of python too, but I’m still a baby using it, so when things get bigger or more complicated, I’m not very efficient.

Anyway, a lot of these tasks end up doing scroll/scan or search in ES, and then feed data back in, or to message queues, or to various endpoints to do work.  They are often long lived.  Code them up, throw them in rundeck, and let them run for hours or days.

One frustration is that it doesn’t go fast enough in simply queueing the work, my storm jobs which are doing the processing, go way faster than the input to them, when I use these scripting options.  I know I could learn how to do async in python, or try to shard things up in shell stack and use ‘parallel’, or find some other solutions.  But since I already have a lot of business logic in my runtime code in Scala, it would be nice to just use that, but without the headache of builds and deployments, something faster and lighter weight, I can still just dump into rundeck.  I know how to control async, threads, and concurrency in this environment, and I know I’m not limited by the toolset.

I looked into this once before but gave up at the dependency part of it.  Then I discovered this blog.

Basically using a hacked up version of sbt, you can write a single file script in scala that will, download dependencies, compile, and launch, with no fus.  I’ll show you how I got it hooked up, mostly followed that blog, and a few others things I found helpful.  Here’s the steps I followed to get it setup on my rundeck linux box:

Install conscript

curl https://raw.githubusercontent.com/foundweekends/conscript/master/setup.sh | sh

Install the sbt runner:

cs sbt/sbt --branch 0.13

Put scalas in your path:

export CONSCRIPT_HOME="$HOME/.conscript"
export CONSCRIPT_OPTS="-Dfile.encoding=UTF-8"
export PATH=$CONSCRIPT_HOME/bin:$PATH

Create a script:

$ chmod +x script.scala 
$ ./script.scala
hello

Ok, so now you can add in the location or your artifacts and dependencies inside the /*** comment like this:

/***
scalaVersion := "2.11.7"
 
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns)

resolvers += "Your Repo" at "http://artifactory.yourplace.com/artifactory/repo"

resolvers += Resolver.mavenLocal

libraryDependencies += "org.scala-sbt" % "io" % "0.13.11"

libraryDependencies += "ch.qos.logback" %  "logback-classic" % "1.1.7"

libraryDependencies += "ch.qos.logback" %  "logback-core" % "1.1.7"

libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21"

libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader"  % "1.3.43"

libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7"
*/

When you run it, it will automatically download dependencies, compile, and run your script.  The internet downloads to here:  $CONSCRIPT_HOME/boot

You can also bring in a logger, control it programmatically.  It starts to look a lot like the python script, at least in regards to how simple it is to configure logging in python.  But with access to all your fancy Scala tools.  When was the last time you could do tail recursion in a script and not worry about a stack overflow ?  @tailrec to the rescue!

Final Thoughts

Scripting with Scala is not good for everything, or everyone.  The compile time stinks when you have a small task, but if it’s going to run for hours or days, and run considerably faster because you can do it concurrently, the few extra seconds to compile is worth it.

Also if you have business logic wrapped up in the java ecosystem already, you may find it an easy way to unlock some stuff quickly without having to put things behind a Rest interface or message queue, or what have you.

It also might be an easy way to explore Scala for some real tasks.  So if you are curious and want to dabble around with it a bit, without having to bet the farm on some new tech that nobody knows or is willing to invest in heavily, give it a go.

LogbackUtils

Here’s the body of the little log util I use for controlling logback without any config files. Something I found on stack overflow and its good enough for command line and simple scripts.  Don’t get me started on how logging sucks in the Java ecosystem, many a day wasted attempting to do things that should be easy…

Show me the error or shutup

Options, Try, Futures, oh my!  You can go read all the various articles on how the semantics of these are similar.  Here’s one,  I highly recommend the cheat sheet as a reference, comes in handy.  But here is a real world examples to help as well.  I find myself very often needing to handle an error just to print out a log message.  So I started out with the normal try/catch as in Java.  It’s a bunch of brackets and kinda verbose.

Then once you switch to using futures, at some point you’ll end up forgetting to handle a failure, and your root cause will just be gone, you’ll waste a bunch of time, and then start putting onComplete, onFailure, or recover everywhere.  At least that’s what I did, maybe I’m an idiot, and you are smarter.

Well,  that’s just as verbose as the try/catch, even more so when you don’t want to handle one side of the equation.  What you can do, and this is the same for Try or a Future, is use “.failed.map”.  Like this:

Try (someStuffThatMightFail).failed.map(e => println(e.getMessage)).

This will invert the Try so that failed() returns you a Try[Throwable], you can then map over it to get your error.  getOrElse() and things like this don’t work because you want the error, and they won’t give it to you.  If you have no error, then the map(), won’t do anything, and you just go on about your business.

So much cleaner, here are some examples:

 

Async stormy weather

 

 

We’ve been using Apache Storm for 6 months or so.  We are at the point where we’ve proved it out, built a bunch of operational and release management practices around it.  It is starting to metastasize around our organization and become something of a gravity well in terms of heavy processing pipelines.

Then I went on vacation and all hell started breaking loose.

Most of our topologies are Kafka initiated.  I started seeing in one major job, the queue “get stuck”.  It would do some work, and then nothing.  In this particular topology we are basically copying data from one data source into another, we have a fairly large message timeout, as some of these can take quite awhile.  What we were seeing was after we would restart the worker, processing would start up again, we might pop a few messages of the queue, but then after a bit we’d be “stuck” again.  No processing, but you can see in the Kafka offsets, that there are messages waiting that we aren’t picking up.

There was another problem, in one particular piece of data we were extracting the counts were coming out wrong, we were missing rows when we copied.  How could this happen this should have triggered a failure in our topology?  But it didn’t!

Well I spent a bunch of time chasing problem that I already solved, like configurations in the Spout or various params in the topology.  My spidey sense told me it has something to do with an async callback.  Because we are using Scala as our language of choice in Storm, and Storm sorta predates Futures, something about this made me uncomfortable.  So I kept digging…

Let me explain the callback in a little more detail…

We are essentially writing rows into Cassandra using the async approach.  I have a wrapper that turns the Cassandra java driver into Scala Futures that I borrowed from here: https://github.com/eigengo/activator-akka-cassandra/blob/master/src/main/scala/core/cassandra.scala.  If you are using Cassandra in Scala and relying on the Java driver natively, I highly recommend this approach, you simply import that, and your Cassandra java futures turn into Scala futures automagically, makes things much more idiomatically Scala and happy.

So from that, when all our inserts complete we end up in an onComplete block like this:

Here lies the problem.  It turns out that the Storm OutputCollector is not thread safe as of 0.8.   You can read about that in their forums, I found it from this article from Spotify.

This was the cause of both of my problems. Acks were not always getting back because of this, so the topology never finished, hence getting “stuck”.  I could see in the UI this was the case.  Also, this is why the failures that were causing me to loose data never materialized.

So what’s the solution ?  Don’t use async?  Nah, that would suck…

I followed Spotify’s approach of storing the completions in a LinkedBlockingQueue.  Then in my execute method I flush them.  In this way, more data (or ticks) coming into your topology will trigger the flushs once you are back on the right thread and everything works.  I just wrote a quick case class to store the data, and then a flush, like this:

Now everything is awesome.  Hopes this help someone else.  I was about ready to throw in the towel on Storm, or update my  resume and move on… about the same time I break through typically 🙂

 

Sql slow query log

Here’s how I made my own slow query log with a Kibana dashboard.  See ya Appdynamics and Solarwinds.

First of all if you are using SQL in Scala take a look at Scalikejdbc.  I’ve been around the block with various ORM solutions and I always come back to wanting something closer to SQL, its easier to tune, and easier to communicate with DBA’s and other parts of your organization.  SQL is the interface to relational database’s, IMHO, arbitrary DSL’s invented simply for syntax sugar, or usually easy to start hard to finish.  Scalikejdbc is a great blend of SQL with efficiency tools that make binding and result set manipulations easy and idiomatic.

So the first part of the puzzle is leveraging the built in mechanism that lets you turn on sql logging.  See the docs for more help.

 

Now you can create a function to be called when your sql query completes using GlobalSettings.taggedQueryCompletionListener.  You’ll notice I’m using the mapped diagnostic context, you can read more about that here.  I’ll explain how I get things into that in a bit, but it’s basically other contextual info that I want to log, like which database, driver, etc.

 

Ok next up.  This is how I shoved the contextual info into the tags so that I could pull it out in the log.  Most of my queries look something like this:

I have a number of different databases I connect to, I use NamedDB to look them up and get a session.  I noticed that the DBSession object in Scalikejdbc has tags, you can also put them on the queries, but I didn’t want to have to do that to all my existing code, and I’m pretty sure I’d forget to do it on the exact query that was causing all my pain, needed a better way.  So I created a new NamedDB case class that would do it for me:

 

 

I still had to update my existing code, but simple search and replace of NamedDB to TaggedDB, done!

I had to do some hacking since Scala doesn’t like you extending case classes, and it’s not fully implemented, but you get the idea:  you can put tags into the session.  My dbName is like “server/databaseName” so I split that out and then grab some other stuff from the connection that I’m interested in logging.  You could do more here if you need it.

Ok so now thats done.  Let’s talk about how I got my logs to go into a special index in Elasticsearch.  I decided to bypass logstash once I found this logback Elasticsearch appender.  Basically you can just wire that into your logback config, and you are done.  It seems performant, it makes calls in a future, etc, etc.

The trick to the configuration is that logger at the top of this blog, that’s the one you want to connect up.  In this way all your existing logs go where they go, but send this slow query log stuff to Elasticsearch.

 

Every property that you put in as a “tag” you need to wire up in this logback config to get it to output into the ES json.  This seems a bit verbose and maybe now I’m seeing why the JVM has 40k loggers, people just simple get these APIs wrong, but in any event there is a way.

That’s it.  Now you’ll end up the a new index in ES with your slow query data that includes the sql, the parameters, how long the query took, and all the contextual info you need to root cause your issues.  Wire it up in a Kibana dashboard and make pretty pictures, hook it up to alerts, etc, etc.  And it didn’t cost you a million dollars for some root detection tool.

I’d show you some data but then I’d have to kill you, so you are going to just use your imagination.  If you’ve read the blog this far, I know you are a smart person and have a bit of an imagination.

Latest Update 1-14-2016

Update.  I found the reliance on Logback problematic using this in another places that rely on a different logging implementations, like Apache Storm needs Log4j2.  So rather than get in the business of supporting every single Java logging impl (why are there so many!), I decided to just take the writing to ES into my own hands.

The ES Java API is good for somethings, but in this case I didn’t want to bring in a bunch of dependencies.  So I borrowed this guys thin http client to Elasticsearch for Scala, https://github.com/gphat/wabisabi/blob/master/src/main/scala/wabisabi/Client.scala, and then queued up stuff and flush in batches (in a Future of course).  This gets hammered in some pretty intense workflows and works great (in the millions of logs per day).  That assumes you’ve done the work to make your ES cluster perform well, that can be a real pain depending on what you are doing, story for another time…

 

Typing Utopia in Elasticsearch

So if you are new to Elasticsearch you may naively think it’s schemaless.  You’ll start jamming stuff in all willy nilly and then eventually either at index time or during query time you’ll start run into some nasties:  NumberFormatException, MapperParsingException, or SearchParseException.

You’ll trace these to mapping issues, which sometimes you can fix, if you reindex all your gops of data, yuck!   If you don’t know the types of problems I’m talking about read this, that guy goes into some good examples that are representative of the types of things we’ve run into as well.  The skinny is that Elasticsearch isn’t schemaless at all, quite the opposite.  It tries to infer your types for you implicitly, which is convenient, but when it gets it wrong, you’ll find that the only way to fix it is to be explicit, which doesn’t sound like schemaless at all does it?

A great example of when this can go wrong is with a float or double.  If the first value you send in a doc  without a mapping is a zero, Elasticsearch will type that as an Int.  Well, then a value of 1.2 comes along, and guess what, the type for that field was selected as an Int, so you just lost precision as its rounded down to a 1 in this next doc.

We tried something similar to the Orchestrate solution, but what we found was that by moving everything into arrays you had to compromise on the aggregations you are able to do.  There are certain types of things that you can’t do with arrays in Elasticsearch.  For our use case this was a bit of a show stopper.

So here is what we did. It’s a combination of multifieldsdynamic mappings, templates, and ignore malformed.

You can use multifields to control ways that you might index a single field differently.  Commonly you’ll see this to store the raw non_analyzed data alongside the analyzed data.

So with a mapping like that you can refer to the field as “tweet” when you want to search using the analyzer, or use “tweet.raw” for aggregations, or sorting, or other times when you don’t want to analyze the field.

Dynamic mappings are ways you can apply mapping rules to fields that haven’t been mapped, based on name or type or other rules.  In a sense it allows you to configure how the implicit type determination happens.

Templates are just a convenient place to store your mapping rules so that they might auto-apply to new indices.  If you are creating a lot of indices this is a huge help, it will help simplify your client code bases as they won’t have to create or alter the mappings for indices anymore.

The ignore malformed is a way to tell Elasticsearch to swallow any errors if the mapping don’t work, instead of blowing up.  This one was the real key to our solution, because it meant you can try to cast everything into every type in a multifield at index time, if it doesn’t work it won’t be there, but nothing blows up.

So putting that altogether you get something like this:

So what this means is that you can essentially autocast any field to its raw value, as a long, double, or date (add other types if you need them) in any query, filter, or aggregation.

tweet
tweet.long
tweet.double
tweet.date
tweet.raw

Why this is useful?  Well aside from dealing with all the various types of mapping errors you will run into on gops of data, it’s really helpful in analysis.

One of our use cases for Elasticsearch is doing early adhoc data analysis, to figure out what we have before we know what we have.  We don’t know what types we might be getting, and in fact we are often looking for patterns of reliability or data quality in the data, so being able to quickly determine when and when we can’t cast into specific types is very informative.   Using aggregations on these typed multifields allows us to do that.  This one mapping will do this for any deeply nested docs and still allows you to run complicated aggs and filters on your nested data.

Reindexing a field in Elasticsearch with curl and jq

So now that I’ve been converted over to functional programming thanks to Scala.  I’ve had the profound realization (profound to me, probably obvious to most) that Unix has been doing functional stuff with piping long before it was cool.

I’ve been working a lot with Elasticsearch over the past few years, and curl is a fantastic tool for interacting with Rest endpoints.  But the problem is most endpoint these days use JSON.  So in the past if I had to manipulate the JSON in anyway, I’d run to real programming language and use a client API.  It turns out there’s this great unix tool called jq that has virtually everything you need to manipulate JSON, and make it easy to pipe it forward to other endpoints or whatever you are doing.  It’s functional in nature and really concise.  It has map, reduce, and ton of other built in functions, a compact syntax for filtering JSON and selecting elements.  It’s easy to work with arrays and objects and transform them, just about as easy as it is Javascript, maybe even easier in some cases.

The first thing I used it for was adding a timestamp to the JSON returned from one Rest call, so I could feed it into Elasticsearch:

What’s going on here?

I wrote an endpoint that return back some information from Kafka, but I wanted this data in Elasticsearch so I can make some cool graphs in Kibana.  But there wasn’t a timestamp in my original design, and I really need that in Elasticsearch to do my time based reports.

You can see that you can feed it name/value pairs, via –arg, so I used the unix data cmd to format the date I needed.  Then it’s a matter of just ‘.timestamp = $timestamp’ to add a new field.  The dot essentially is a reference to the root of the JSON object. Did that just reinvent 1/2 of what logstash in one line ?  Don’t tell them they might get mad.

The next day feeling pretty good about jq’s coolness, I ran into another problem where I needed to reindex some data in certain documents in Elasticsearch.  Once you have gobs of data in Elasticsearch you need to be smart about updates and reindexing as things can take a long time.  In this case we needed to modify an existing field, and add a new multifield to it in order to improve some searches we were doing.  So first I used curl to close the index, adjust the settings on the index to add my new analyzer, then I reopen the index and add the new mapping.  That’s basic stuff, all easily done with curl or whatever client API you like.

But now I want to update the indices that had the problem, but only update this one field.  Our docs are pretty big, we don’t want to waste time reading and writing the whole thing.

Here’s the whole thing before I break it down:

So first we have the query to find all the affected docs by looking for docs that were missing the new field I added and have the unique data issue (a bunch of padded zeros) that we want to index better.  If you don’t know what Elasticsearch results look like this will give you an idea:

Then jq comes in.

jq -c .hits.hits[] | { “update” : {“_index” : ._index, “_id” :._id, “_type” : ._type}}, { “doc” : {“FbKey”: ._source.FbKey}}

That one line builds up the update bulk update request, in this case 500 requests at a time.  So lets look at what is going on here.  We grab the results from the search request, as an array

hits.hits[]

Then we pipe that (all in jq) to an expression that builds up the bulk request.  The expressions like “._index” are actually selecting bits of data out of each hits.hits[] object as we pipe through it.

{ “doc” : {“FbKey”: ._source.FbKey}

That part is where we update the affected field, so that Elasticsearch will reindex with our new analyzer and mappings configured.

We then feed all this jq business to the next curl cmd to send it into Elasticsearch as a bulk request.  At the end “.items | length” looks at the result from the bulk cmd and tells us how many records we modified.

So we just keep running this until we run out of docs to update, meaning we’ve fixed everything.

Pretty sweet way to manage your data in elasticsearch with just curl and jq!  Now go get all Unix’y with your next Rest/JSON administrative task.

Maven Release Plugin Horrors with git

If you have spent any time trying to use the Maven release plugin, you probably hate it.  It wants to run your build twice, typically you have to wait 20 minutes to hours depending on the size of your project to figure out that it barfed and then try something else, barf again.  At some point you either dig in really hard, or start looking for other solutions.

As is just about typical with every major block I have, just when I’m about to kill myself, my dog, or anyone that comes near me, I figure things out.

So my story is this, we have the Maven release plugin working great locally, but doing releases from laptops is not ideal, and of course on the weekend I needed to do a release and the VPN was painful to run the test suite over.  I decided it was about time to get this thing working in jenkins, how hard could it be, throw it up there, wire in the maven goals, bada bing right ?

Not so much.

Ran the release goals, everything is awesome, then it goes to push the thing back into git:

I do some googling, find out that jenkins checks out git code into a detached head, so you need to deal with that.  After a few failed attempts doing things that seemed much more complicated then they needed to be I landed on this:

http://stackoverflow.com/questions/11511390/jenkins-git-plugin-detached-head

Basically you need to just pick the “check out to specific local branch” option and set your branch.  Grab a beer, wait another 25 minutes and then bamn!:

Well this one got got me stuck forever.  Convinced it was some credential problem in jenkins trying all sorts of things messing around with the git plugin, credentials, you name it nothing helping.  Started pondering on “this is why people hate Java”…

Then finally I was sitting there staring at that url,

could not read Username for 'https://github.com'

Thinking about where have I seen that before somewhere else.  Oh yeah, its in the maven pom.xml:

I changed the git url to this:

And hell yeah it finally worked.  It makes sense if you think about how git is handling security that the http protocol isn’t going to work here since since we are detached from the original git pull of the code.  Anyway, it was much pain and suffering and I couldn’t believe stackoverflow and google let me down, it took way too much time to figure this out.  That is why I’m blogging about it, so some other poor sorry soul won’t have to kill their dog over it.

As a side note, I ran into this while looking for a solution:

http://axelfontaine.com/blog/final-nail.html

This guys approach seems pretty interesting.  It eliminates the two builds and seems really simple and straightforward to implement.  I was just about to try it out, when I got the past this.  But for sure when I find some time, or next time this poops out, I might be giving that a go.

For the record, I have other builds using the SBT release plugin, and I went really deep on that too, I have battle scars from that one too, a tail for another day…