Embedded data meets big data – sqlite in spark, are you crazy ?

Everyone of these new runtimes I go deep on, or distributed system, takes me to the edge of sanity until I punch through.  My recent experience with Spark was no different, but I finally punched through.

We have an existing ETL system that pulls views from our databases.  Out of the top 10 list here, we use 8 of them.  Most of these views are 500,000 rows or less (95%), but the top 5% which get used the most may have an many as 10,000,000 rows.   100 or more columns is normal as well.  These are wide flattened tables.  People use them to download and do offline analysis, simple introspection in a web view, but its also starting to seed our data lake.

When we explored building the first generation of this system, we looked at all the popular open source databases for speed at being able to arbitrarily filter, group, and sort wide tables up to 10 million records and 200 columns wide.  We looked at postgresql, mysql, and sqlite.  We use elasticsearch quite a bit, but ruled it out for this use case due to cost (lots of ram) and time to write, those are maybe things we could have overcome, but decided not to.  We wanted to be able to avoid creating indices because we didn’t know how users were going to look at the data.  Our API is similar to elastic’s where you can basically, filter, sort, and group on any dimension.   SQLite won the war, if you turn off transactions and journaling, you can get amazing performance for what we needed, for nearly everything even on commodity boxes you can see sub second response times across the board.

The first version used Storm to extract the data into Cassandra.  The next version was going to use Spark to extract into parquet and sqlite as well.  We’d have Spark simply prepare our sqlite files for us, so that we could drop them in S3 and have the API layer pick them up and do its thing.  This saved us cost of running and managing Cassandra, and started to prepare our data lake with parquet files, which from Spark to Presto is where you want to be.

Spark dropped the lines of code in half because it has drivers for all our data sources and we can avoid all the giant pattern matching blocks to deal with typing issues, as the data frame just does this for you.  In addition, because of clever partitioning with our extraction we were able to achieve more parallelism out of the source.   We were able to see extraction rates go from 1500-3000 rows per second to 8000-30,000 rows per second depending on the width of the table.  So that’s a huge improvement we can get 10 of millions of rows out in minutes.  Less code, fast, strategic for the future, good stuff!

Now the bad…

Everything was going swimmingly, until we started testing the larger datasets and we’d see the sqlite creation part stalling in Spark.  No errors, the job would just hang forever and have to be killed.  Sometimes they would work, but usually not.  We got stuck.  Enter the tuning and runtime learning process…

When you submit a job to yarn,  you start learning about executors and cores.  The vernacular here I found confusing.  Because an executor is a process and a core isn’t a CPU, its a task, or a thread.  So when you tell spark-submit you want –executor-cores 5 and –num-executors 4.  You are requesting 4 jvm processes, that will each run basically 5 threads.  From my familiar Storm world this translates to workers (executors) and task (cores).  There is also a driver process that is separate from the executors, more on that later…

I read everything I could about that and tried experimenting with the number of each and couldn’t move the dial on my problem.  The size of the cluster in terms of nodes or CPUs or executors or cores didn’t make any difference, the problem still occurred.

It felt like a memory problem, maybe the system was tied up in GC hell spinning.  The Spark UIs reports some of this kind of thing, but nothing glaring was showing up there or from various metrics.  But regardless, I started experiments with the –executor-memory setting.  But again, that didn’t move the ball.

So let’s talk about dataframes and storage for a minute…

When you pull data into Spark into a dataframe or RDD, you are basically pulling it into memory across your cluster with pieces of the dataframe spread out among your executors.  So your data is not all on one machine.  When you ask Spark to filter or group or otherwise transform your dataframe, Spark will spawn this work out to the various executors so they can all work on the problem on the piece of the dataset they own.

When you run Spark in cluster mode you are relying on HDFS storage, if you use EMR in AWS like we do, you can make this end up in S3.  But regardless, in a cluster environment where you are writing files, you have to have some type of shared network storage solution.  Now come back to SQLite.  SQLite drivers don’t understand HDFS, you can’t directly talk to it, you need to write to local storage.

So the situation here is that you need to bring all your data back to one node, in order to write to this local file.  No problem Spark will let you do a collect() or a toLocalIterator() that brings all the data back to the driver.  This is the piece that would stall, the work that happened after the collect().  Well, here is where my stupidity came into play.  In turns out all my fudging with the executor memory was doing nothing about the driver memory.  As it turns out the default driver memory is only 500MB.  That’s when I realized omg, I’m an idiot.  There is a another spark-submit option for the driver memory, which for whatever reason doesn’t seem to end up in examples or stack overflows on this subject (hence this blog…)  Once I set this to something large enough to handle the SQLite data, my problem went away, see full example that follows.

So now I’m in the happy zone, my hundreds of views are happily extracting and I can get onto the next step which is incremental pulls from source.  Then I will go from 10 million to 100 million and beyond.  I let you know how that part works out, its only in my head right now…

Advertisements

Scala scripting ?!

So I have a lot of little data fixing up scripts I have to write to maintain our large and growing elasticsearch database.  Typically, I use shell stack (bash/jq/curl) to do such tasks.  When things get a little bigger I switch to python.  I’m a big fan of jq/curl but for anything that isn’t small, it gets nasty.  I’m a big fan of python too, but I’m still a baby using it, so when things get bigger or more complicated, I’m not very efficient.

Anyway, a lot of these tasks end up doing scroll/scan or search in ES, and then feed data back in, or to message queues, or to various endpoints to do work.  They are often long lived.  Code them up, throw them in rundeck, and let them run for hours or days.

One frustration is that it doesn’t go fast enough in simply queueing the work, my storm jobs which are doing the processing, go way faster than the input to them, when I use these scripting options.  I know I could learn how to do async in python, or try to shard things up in shell stack and use ‘parallel’, or find some other solutions.  But since I already have a lot of business logic in my runtime code in Scala, it would be nice to just use that, but without the headache of builds and deployments, something faster and lighter weight, I can still just dump into rundeck.  I know how to control async, threads, and concurrency in this environment, and I know I’m not limited by the toolset.

I looked into this once before but gave up at the dependency part of it.  Then I discovered this blog.

Basically using a hacked up version of sbt, you can write a single file script in scala that will, download dependencies, compile, and launch, with no fus.  I’ll show you how I got it hooked up, mostly followed that blog, and a few others things I found helpful.  Here’s the steps I followed to get it setup on my rundeck linux box:

Install conscript

curl https://raw.githubusercontent.com/foundweekends/conscript/master/setup.sh | sh

Install the sbt runner:

cs sbt/sbt --branch 0.13

Put scalas in your path:

export CONSCRIPT_HOME="$HOME/.conscript"
export CONSCRIPT_OPTS="-Dfile.encoding=UTF-8"
export PATH=$CONSCRIPT_HOME/bin:$PATH

Create a script:

$ chmod +x script.scala 
$ ./script.scala
hello

Ok, so now you can add in the location or your artifacts and dependencies inside the /*** comment like this:

/***
scalaVersion := "2.11.7"
 
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns)

resolvers += "Your Repo" at "http://artifactory.yourplace.com/artifactory/repo"

resolvers += Resolver.mavenLocal

libraryDependencies += "org.scala-sbt" % "io" % "0.13.11"

libraryDependencies += "ch.qos.logback" %  "logback-classic" % "1.1.7"

libraryDependencies += "ch.qos.logback" %  "logback-core" % "1.1.7"

libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21"

libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader"  % "1.3.43"

libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7"
*/

When you run it, it will automatically download dependencies, compile, and run your script.  The internet downloads to here:  $CONSCRIPT_HOME/boot

You can also bring in a logger, control it programmatically.  It starts to look a lot like the python script, at least in regards to how simple it is to configure logging in python.  But with access to all your fancy Scala tools.  When was the last time you could do tail recursion in a script and not worry about a stack overflow ?  @tailrec to the rescue!

Final Thoughts

Scripting with Scala is not good for everything, or everyone.  The compile time stinks when you have a small task, but if it’s going to run for hours or days, and run considerably faster because you can do it concurrently, the few extra seconds to compile is worth it.

Also if you have business logic wrapped up in the java ecosystem already, you may find it an easy way to unlock some stuff quickly without having to put things behind a Rest interface or message queue, or what have you.

It also might be an easy way to explore Scala for some real tasks.  So if you are curious and want to dabble around with it a bit, without having to bet the farm on some new tech that nobody knows or is willing to invest in heavily, give it a go.

LogbackUtils

Here’s the body of the little log util I use for controlling logback without any config files. Something I found on stack overflow and its good enough for command line and simple scripts.  Don’t get me started on how logging sucks in the Java ecosystem, many a day wasted attempting to do things that should be easy…

Show me the error or shutup

Options, Try, Futures, oh my!  You can go read all the various articles on how the semantics of these are similar.  Here’s one,  I highly recommend the cheat sheet as a reference, comes in handy.  But here is a real world examples to help as well.  I find myself very often needing to handle an error just to print out a log message.  So I started out with the normal try/catch as in Java.  It’s a bunch of brackets and kinda verbose.

Then once you switch to using futures, at some point you’ll end up forgetting to handle a failure, and your root cause will just be gone, you’ll waste a bunch of time, and then start putting onComplete, onFailure, or recover everywhere.  At least that’s what I did, maybe I’m an idiot, and you are smarter.

Well,  that’s just as verbose as the try/catch, even more so when you don’t want to handle one side of the equation.  What you can do, and this is the same for Try or a Future, is use “.failed.map”.  Like this:

Try (someStuffThatMightFail).failed.map(e => println(e.getMessage)).

This will invert the Try so that failed() returns you a Try[Throwable], you can then map over it to get your error.  getOrElse() and things like this don’t work because you want the error, and they won’t give it to you.  If you have no error, then the map(), won’t do anything, and you just go on about your business.

So much cleaner, here are some examples:

 

Async stormy weather

 

 

We’ve been using Apache Storm for 6 months or so.  We are at the point where we’ve proved it out, built a bunch of operational and release management practices around it.  It is starting to metastasize around our organization and become something of a gravity well in terms of heavy processing pipelines.

Then I went on vacation and all hell started breaking loose.

Most of our topologies are Kafka initiated.  I started seeing in one major job, the queue “get stuck”.  It would do some work, and then nothing.  In this particular topology we are basically copying data from one data source into another, we have a fairly large message timeout, as some of these can take quite awhile.  What we were seeing was after we would restart the worker, processing would start up again, we might pop a few messages of the queue, but then after a bit we’d be “stuck” again.  No processing, but you can see in the Kafka offsets, that there are messages waiting that we aren’t picking up.

There was another problem, in one particular piece of data we were extracting the counts were coming out wrong, we were missing rows when we copied.  How could this happen this should have triggered a failure in our topology?  But it didn’t!

Well I spent a bunch of time chasing problem that I already solved, like configurations in the Spout or various params in the topology.  My spidey sense told me it has something to do with an async callback.  Because we are using Scala as our language of choice in Storm, and Storm sorta predates Futures, something about this made me uncomfortable.  So I kept digging…

Let me explain the callback in a little more detail…

We are essentially writing rows into Cassandra using the async approach.  I have a wrapper that turns the Cassandra java driver into Scala Futures that I borrowed from here: https://github.com/eigengo/activator-akka-cassandra/blob/master/src/main/scala/core/cassandra.scala.  If you are using Cassandra in Scala and relying on the Java driver natively, I highly recommend this approach, you simply import that, and your Cassandra java futures turn into Scala futures automagically, makes things much more idiomatically Scala and happy.

So from that, when all our inserts complete we end up in an onComplete block like this:

Here lies the problem.  It turns out that the Storm OutputCollector is not thread safe as of 0.8.   You can read about that in their forums, I found it from this article from Spotify.

This was the cause of both of my problems. Acks were not always getting back because of this, so the topology never finished, hence getting “stuck”.  I could see in the UI this was the case.  Also, this is why the failures that were causing me to loose data never materialized.

So what’s the solution ?  Don’t use async?  Nah, that would suck…

I followed Spotify’s approach of storing the completions in a LinkedBlockingQueue.  Then in my execute method I flush them.  In this way, more data (or ticks) coming into your topology will trigger the flushs once you are back on the right thread and everything works.  I just wrote a quick case class to store the data, and then a flush, like this:

Now everything is awesome.  Hopes this help someone else.  I was about ready to throw in the towel on Storm, or update my  resume and move on… about the same time I break through typically 🙂

 

Sql slow query log

Here’s how I made my own slow query log with a Kibana dashboard.  See ya Appdynamics and Solarwinds.

First of all if you are using SQL in Scala take a look at Scalikejdbc.  I’ve been around the block with various ORM solutions and I always come back to wanting something closer to SQL, its easier to tune, and easier to communicate with DBA’s and other parts of your organization.  SQL is the interface to relational database’s, IMHO, arbitrary DSL’s invented simply for syntax sugar, or usually easy to start hard to finish.  Scalikejdbc is a great blend of SQL with efficiency tools that make binding and result set manipulations easy and idiomatic.

So the first part of the puzzle is leveraging the built in mechanism that lets you turn on sql logging.  See the docs for more help.

 

Now you can create a function to be called when your sql query completes using GlobalSettings.taggedQueryCompletionListener.  You’ll notice I’m using the mapped diagnostic context, you can read more about that here.  I’ll explain how I get things into that in a bit, but it’s basically other contextual info that I want to log, like which database, driver, etc.

 

Ok next up.  This is how I shoved the contextual info into the tags so that I could pull it out in the log.  Most of my queries look something like this:

I have a number of different databases I connect to, I use NamedDB to look them up and get a session.  I noticed that the DBSession object in Scalikejdbc has tags, you can also put them on the queries, but I didn’t want to have to do that to all my existing code, and I’m pretty sure I’d forget to do it on the exact query that was causing all my pain, needed a better way.  So I created a new NamedDB case class that would do it for me:

 

 

I still had to update my existing code, but simple search and replace of NamedDB to TaggedDB, done!

I had to do some hacking since Scala doesn’t like you extending case classes, and it’s not fully implemented, but you get the idea:  you can put tags into the session.  My dbName is like “server/databaseName” so I split that out and then grab some other stuff from the connection that I’m interested in logging.  You could do more here if you need it.

Ok so now thats done.  Let’s talk about how I got my logs to go into a special index in Elasticsearch.  I decided to bypass logstash once I found this logback Elasticsearch appender.  Basically you can just wire that into your logback config, and you are done.  It seems performant, it makes calls in a future, etc, etc.

The trick to the configuration is that logger at the top of this blog, that’s the one you want to connect up.  In this way all your existing logs go where they go, but send this slow query log stuff to Elasticsearch.

 

Every property that you put in as a “tag” you need to wire up in this logback config to get it to output into the ES json.  This seems a bit verbose and maybe now I’m seeing why the JVM has 40k loggers, people just simple get these APIs wrong, but in any event there is a way.

That’s it.  Now you’ll end up the a new index in ES with your slow query data that includes the sql, the parameters, how long the query took, and all the contextual info you need to root cause your issues.  Wire it up in a Kibana dashboard and make pretty pictures, hook it up to alerts, etc, etc.  And it didn’t cost you a million dollars for some root detection tool.

I’d show you some data but then I’d have to kill you, so you are going to just use your imagination.  If you’ve read the blog this far, I know you are a smart person and have a bit of an imagination.

Latest Update 1-14-2016

Update.  I found the reliance on Logback problematic using this in another places that rely on a different logging implementations, like Apache Storm needs Log4j2.  So rather than get in the business of supporting every single Java logging impl (why are there so many!), I decided to just take the writing to ES into my own hands.

The ES Java API is good for somethings, but in this case I didn’t want to bring in a bunch of dependencies.  So I borrowed this guys thin http client to Elasticsearch for Scala, https://github.com/gphat/wabisabi/blob/master/src/main/scala/wabisabi/Client.scala, and then queued up stuff and flush in batches (in a Future of course).  This gets hammered in some pretty intense workflows and works great (in the millions of logs per day).  That assumes you’ve done the work to make your ES cluster perform well, that can be a real pain depending on what you are doing, story for another time…

 

Typing Utopia in Elasticsearch

So if you are new to Elasticsearch you may naively think it’s schemaless.  You’ll start jamming stuff in all willy nilly and then eventually either at index time or during query time you’ll start run into some nasties:  NumberFormatException, MapperParsingException, or SearchParseException.

You’ll trace these to mapping issues, which sometimes you can fix, if you reindex all your gops of data, yuck!   If you don’t know the types of problems I’m talking about read this, that guy goes into some good examples that are representative of the types of things we’ve run into as well.  The skinny is that Elasticsearch isn’t schemaless at all, quite the opposite.  It tries to infer your types for you implicitly, which is convenient, but when it gets it wrong, you’ll find that the only way to fix it is to be explicit, which doesn’t sound like schemaless at all does it?

A great example of when this can go wrong is with a float or double.  If the first value you send in a doc  without a mapping is a zero, Elasticsearch will type that as an Int.  Well, then a value of 1.2 comes along, and guess what, the type for that field was selected as an Int, so you just lost precision as its rounded down to a 1 in this next doc.

We tried something similar to the Orchestrate solution, but what we found was that by moving everything into arrays you had to compromise on the aggregations you are able to do.  There are certain types of things that you can’t do with arrays in Elasticsearch.  For our use case this was a bit of a show stopper.

So here is what we did. It’s a combination of multifieldsdynamic mappings, templates, and ignore malformed.

You can use multifields to control ways that you might index a single field differently.  Commonly you’ll see this to store the raw non_analyzed data alongside the analyzed data.

So with a mapping like that you can refer to the field as “tweet” when you want to search using the analyzer, or use “tweet.raw” for aggregations, or sorting, or other times when you don’t want to analyze the field.

Dynamic mappings are ways you can apply mapping rules to fields that haven’t been mapped, based on name or type or other rules.  In a sense it allows you to configure how the implicit type determination happens.

Templates are just a convenient place to store your mapping rules so that they might auto-apply to new indices.  If you are creating a lot of indices this is a huge help, it will help simplify your client code bases as they won’t have to create or alter the mappings for indices anymore.

The ignore malformed is a way to tell Elasticsearch to swallow any errors if the mapping don’t work, instead of blowing up.  This one was the real key to our solution, because it meant you can try to cast everything into every type in a multifield at index time, if it doesn’t work it won’t be there, but nothing blows up.

So putting that altogether you get something like this:

So what this means is that you can essentially autocast any field to its raw value, as a long, double, or date (add other types if you need them) in any query, filter, or aggregation.

tweet
tweet.long
tweet.double
tweet.date
tweet.raw

Why this is useful?  Well aside from dealing with all the various types of mapping errors you will run into on gops of data, it’s really helpful in analysis.

One of our use cases for Elasticsearch is doing early adhoc data analysis, to figure out what we have before we know what we have.  We don’t know what types we might be getting, and in fact we are often looking for patterns of reliability or data quality in the data, so being able to quickly determine when and when we can’t cast into specific types is very informative.   Using aggregations on these typed multifields allows us to do that.  This one mapping will do this for any deeply nested docs and still allows you to run complicated aggs and filters on your nested data.

Reindexing a field in Elasticsearch with curl and jq

So now that I’ve been converted over to functional programming thanks to Scala.  I’ve had the profound realization (profound to me, probably obvious to most) that Unix has been doing functional stuff with piping long before it was cool.

I’ve been working a lot with Elasticsearch over the past few years, and curl is a fantastic tool for interacting with Rest endpoints.  But the problem is most endpoint these days use JSON.  So in the past if I had to manipulate the JSON in anyway, I’d run to real programming language and use a client API.  It turns out there’s this great unix tool called jq that has virtually everything you need to manipulate JSON, and make it easy to pipe it forward to other endpoints or whatever you are doing.  It’s functional in nature and really concise.  It has map, reduce, and ton of other built in functions, a compact syntax for filtering JSON and selecting elements.  It’s easy to work with arrays and objects and transform them, just about as easy as it is Javascript, maybe even easier in some cases.

The first thing I used it for was adding a timestamp to the JSON returned from one Rest call, so I could feed it into Elasticsearch:

What’s going on here?

I wrote an endpoint that return back some information from Kafka, but I wanted this data in Elasticsearch so I can make some cool graphs in Kibana.  But there wasn’t a timestamp in my original design, and I really need that in Elasticsearch to do my time based reports.

You can see that you can feed it name/value pairs, via –arg, so I used the unix data cmd to format the date I needed.  Then it’s a matter of just ‘.timestamp = $timestamp’ to add a new field.  The dot essentially is a reference to the root of the JSON object. Did that just reinvent 1/2 of what logstash in one line ?  Don’t tell them they might get mad.

The next day feeling pretty good about jq’s coolness, I ran into another problem where I needed to reindex some data in certain documents in Elasticsearch.  Once you have gobs of data in Elasticsearch you need to be smart about updates and reindexing as things can take a long time.  In this case we needed to modify an existing field, and add a new multifield to it in order to improve some searches we were doing.  So first I used curl to close the index, adjust the settings on the index to add my new analyzer, then I reopen the index and add the new mapping.  That’s basic stuff, all easily done with curl or whatever client API you like.

But now I want to update the indices that had the problem, but only update this one field.  Our docs are pretty big, we don’t want to waste time reading and writing the whole thing.

Here’s the whole thing before I break it down:

So first we have the query to find all the affected docs by looking for docs that were missing the new field I added and have the unique data issue (a bunch of padded zeros) that we want to index better.  If you don’t know what Elasticsearch results look like this will give you an idea:

Then jq comes in.

jq -c .hits.hits[] | { “update” : {“_index” : ._index, “_id” :._id, “_type” : ._type}}, { “doc” : {“FbKey”: ._source.FbKey}}

That one line builds up the update bulk update request, in this case 500 requests at a time.  So lets look at what is going on here.  We grab the results from the search request, as an array

hits.hits[]

Then we pipe that (all in jq) to an expression that builds up the bulk request.  The expressions like “._index” are actually selecting bits of data out of each hits.hits[] object as we pipe through it.

{ “doc” : {“FbKey”: ._source.FbKey}

That part is where we update the affected field, so that Elasticsearch will reindex with our new analyzer and mappings configured.

We then feed all this jq business to the next curl cmd to send it into Elasticsearch as a bulk request.  At the end “.items | length” looks at the result from the bulk cmd and tells us how many records we modified.

So we just keep running this until we run out of docs to update, meaning we’ve fixed everything.

Pretty sweet way to manage your data in elasticsearch with just curl and jq!  Now go get all Unix’y with your next Rest/JSON administrative task.