Sql slow query log

Here’s how I made my own slow query log with a Kibana dashboard.  See ya Appdynamics and Solarwinds.

First of all if you are using SQL in Scala take a look at Scalikejdbc.  I’ve been around the block with various ORM solutions and I always come back to wanting something closer to SQL, its easier to tune, and easier to communicate with DBA’s and other parts of your organization.  SQL is the interface to relational database’s, IMHO, arbitrary DSL’s invented simply for syntax sugar, or usually easy to start hard to finish.  Scalikejdbc is a great blend of SQL with efficiency tools that make binding and result set manipulations easy and idiomatic.

So the first part of the puzzle is leveraging the built in mechanism that lets you turn on sql logging.  See the docs for more help.


import scalikejdbc._
GlobalSettings.loggingSQLAndTime = LoggingSQLAndTimeSettings(
enabled = true,
singleLineMode = false,
printUnprocessedStackTrace = false,
stackTraceDepth= 15,
logLevel = 'debug,
warningEnabled = false,
warningThresholdMillis = 3000L,
warningLogLevel = 'warn
)

view raw

gistfile1.txt

hosted with ❤ by GitHub

 

Now you can create a function to be called when your sql query completes using GlobalSettings.taggedQueryCompletionListener.  You’ll notice I’m using the mapped diagnostic context, you can read more about that here.  I’ll explain how I get things into that in a bit, but it’s basically other contextual info that I want to log, like which database, driver, etc.


import org.slf4j.MDC
import scalikejdbc._
GlobalSettings.taggedQueryCompletionListener = (sql: String, params: Seq[Any], millis: Long, tags: Seq[String]) => {
if (millis > warningThresholdMillis) {
MDC.put("sql", sql)
MDC.put("params", params.mkString("[", ",", "]"))
MDC.put("millis", millis.toString)
MDC.put("error", null)
setExtraProps(tags)
logger.warn(s"completion")
}
}
def setExtraProps(tags: Seq[String]) {
MDC.put("app", appName)
if (tags.size > 0)
MDC.put("server", tags(0))
else
MDC.put("server", null)
if (tags.size > 1)
MDC.put("database", tags(1))
else
MDC.put("database", null)
if (tags.size > 2)
MDC.put("url", tags(2))
else
MDC.put("url", null)
if (tags.size > 3)
MDC.put("user", tags(3))
else
MDC.put("user", null)
if (tags.size > 4)
MDC.put("driver", tags(4))
else
MDC.put("driver", null)
}

view raw

gistfile1.txt

hosted with ❤ by GitHub

 

Ok next up.  This is how I shoved the contextual info into the tags so that I could pull it out in the log.  Most of my queries look something like this:


NamedDB(dbName) readOnly {
implicit session => {
sql"""select column1 from table1""".map(rs => rs.string(1)).list.apply()
}
}

view raw

gistfile1.txt

hosted with ❤ by GitHub

I have a number of different databases I connect to, I use NamedDB to look them up and get a session.  I noticed that the DBSession object in Scalikejdbc has tags, you can also put them on the queries, but I didn’t want to have to do that to all my existing code, and I’m pretty sure I’d forget to do it on the exact query that was causing all my pain, needed a better way.  So I created a new NamedDB case class that would do it for me:


case class TaggedDB(name: Any)(implicit context: ConnectionPoolContext = NoConnectionPoolContext) extends DBConnection with LazyLogging {
private[this] def connectionPool(): ConnectionPool = Option(context match {
case NoConnectionPoolContext => ConnectionPool(name)
case _: MultipleConnectionPoolContext => context.get(name)
case _ => throw new IllegalStateException(ErrorMessage.UNKNOWN_CONNECTION_POOL_CONTEXT)
}) getOrElse {
throw new IllegalStateException(ErrorMessage.CONNECTION_POOL_IS_NOT_YET_INITIALIZED)
}
override def connectionAttributes: DBConnectionAttributes = {
connectionPool().connectionAttributes
}
private lazy val db: DB = {
val cp = connectionPool()
DB(cp.borrow(), connectionAttributes)
}
def toDB(): DB = db
def conn: Connection = db.conn
def tags(name:Any, conn:Connection):Seq[String] = {
if (!name.isInstanceOf[String]) return Seq.empty[String]
val nameStr = name.asInstanceOf[String]
val pieces = nameStr.split("/")
val (server, dbName) = if (pieces.size > 1) (pieces(0), pieces(1)) else (nameStr, "???")
Seq(
server,
dbName,
conn.getMetaData.getURL,
conn.getMetaData.getUserName,
conn.getMetaData.getDriverName)
}
override def readOnlySession(): DBSession = {
val session = DBSession(conn, isReadOnly = true, connectionAttributes = connectionAttributes)
session.tags(tags(name, session.connection):_*)
}
override def autoCommitSession(): DBSession = {
val session = DBSession(conn, isReadOnly = true, connectionAttributes = connectionAttributes)
session.tags(tags(name, session.connection):_*)
}
}

view raw

gistfile1.txt

hosted with ❤ by GitHub

 

 

I still had to update my existing code, but simple search and replace of NamedDB to TaggedDB, done!

I had to do some hacking since Scala doesn’t like you extending case classes, and it’s not fully implemented, but you get the idea:  you can put tags into the session.  My dbName is like “server/databaseName” so I split that out and then grab some other stuff from the connection that I’m interested in logging.  You could do more here if you need it.

Ok so now thats done.  Let’s talk about how I got my logs to go into a special index in Elasticsearch.  I decided to bypass logstash once I found this logback Elasticsearch appender.  Basically you can just wire that into your logback config, and you are done.  It seems performant, it makes calls in a future, etc, etc.

The trick to the configuration is that logger at the top of this blog, that’s the one you want to connect up.  In this way all your existing logs go where they go, but send this slow query log stuff to Elasticsearch.


<configuration>
<appender name="ELASTIC" class="com.internetitem.logback.elasticsearch.ElasticsearchAppender">
<url>http://elasticsearch.yoursite.com/_bulk</url&gt;
<index>platform-logs-%date{yyyy-MM-dd}</index>
<type>logback</type>
<loggerName>es-logger</loggerName> <!– optional –>
<errorLoggerName>es-error-logger</errorLoggerName> <!– optional –>
<connectTimeout>30000</connectTimeout> <!– optional (in ms, default 30000) –>
<errorsToStdErr>true</errorsToStdErr> <!– optional (default false) –>
<includeCallerData>false</includeCallerData> <!– optional (default false) –>
<logsToStdErr>false</logsToStdErr> <!– optional (default false) –>
<maxQueueSize>104857600</maxQueueSize> <!– optional (default 104857600) –>
<maxRetries>3</maxRetries> <!– optional (default 3) –>
<readTimeout>30000</readTimeout> <!– optional (in ms, default 30000) –>
<sleepTime>250</sleepTime> <!– optional (in ms, default 250) –>
<properties>
<property>
<name>driver</name>
<value>%X{driver}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>user</name>
<value>%X{user}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>url</name>
<value>%X{url}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>app</name>
<value>%X{app}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>sql</name>
<value>%X{sql}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>params</name>
<value>%X{params}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>error</name>
<value>%X{error}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>server</name>
<value>%X{server}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>database</name>
<value>%X{database}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>millis</name>
<value>%X{millis}</value>
<allowEmpty>true</allowEmpty>
</property>
<property>
<name>host</name>
<value>${HOSTNAME}</value>
<allowEmpty>false</allowEmpty>
</property>
<property>
<name>severity</name>
<value>%level</value>
</property>
<property>
<name>thread</name>
<value>%thread</value>
</property>
<property>
<name>stacktrace</name>
<value>%ex</value>
</property>
<property>
<name>logger</name>
<value>%logger</value>
</property>
</properties>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!– encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default –>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger – %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<logger name="es-logger" level="error"/>
<logger name="com.platform.sql" level="error">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.platform.sql" level="warn">
<appender-ref ref="ELASTIC" />
</logger>
</configuration>

view raw

gistfile1.txt

hosted with ❤ by GitHub

 

Every property that you put in as a “tag” you need to wire up in this logback config to get it to output into the ES json.  This seems a bit verbose and maybe now I’m seeing why the JVM has 40k loggers, people just simple get these APIs wrong, but in any event there is a way.

That’s it.  Now you’ll end up the a new index in ES with your slow query data that includes the sql, the parameters, how long the query took, and all the contextual info you need to root cause your issues.  Wire it up in a Kibana dashboard and make pretty pictures, hook it up to alerts, etc, etc.  And it didn’t cost you a million dollars for some root detection tool.

I’d show you some data but then I’d have to kill you, so you are going to just use your imagination.  If you’ve read the blog this far, I know you are a smart person and have a bit of an imagination.

Latest Update 1-14-2016

Update.  I found the reliance on Logback problematic using this in another places that rely on a different logging implementations, like Apache Storm needs Log4j2.  So rather than get in the business of supporting every single Java logging impl (why are there so many!), I decided to just take the writing to ES into my own hands.

The ES Java API is good for somethings, but in this case I didn’t want to bring in a bunch of dependencies.  So I borrowed this guys thin http client to Elasticsearch for Scala, https://github.com/gphat/wabisabi/blob/master/src/main/scala/wabisabi/Client.scala, and then queued up stuff and flush in batches (in a Future of course).  This gets hammered in some pretty intense workflows and works great (in the millions of logs per day).  That assumes you’ve done the work to make your ES cluster perform well, that can be a real pain depending on what you are doing, story for another time…


val slowQueryESclient = new Client("http://localhost:80&quot;)
var queue = new LinkedBlockingQueue[Map[String, Any]]()
def getExtraProps(tags: Seq[String]): Map[String, Any] = {
var extraProps = Map[String, Any]()
extraProps = extraProps + ("app" -> appName)
if (tags.size > 0)
extraProps = extraProps + ("server" -> tags(0))
if (tags.size > 1)
extraProps = extraProps + ("database" -> tags(1))
if (tags.size > 2)
extraProps = extraProps + ("url" -> tags(2))
if (tags.size > 3)
extraProps = extraProps + ("user" -> tags(3))
if (tags.size > 4)
extraProps = extraProps + ("driver" -> tags(4))
extraProps
}
GlobalSettings.taggedQueryCompletionListener = (sql: String, params: Seq[Any], millis: Long, tags: Seq[String]) => {
if (millis > warningThresholdMillis) {
val data = Map(
"stmt" -> sql,
"message" -> "completion",
"timestamp" -> DateTimeUtils.currentIsoString,
"params" -> params.mkString("[", ",", "]"),
"millis" -> millis)
slowQueryLog(data ++ getExtraProps(tags))
}
}
GlobalSettings.taggedQueryFailureListener = (sql: String, params: Seq[Any], e: Throwable, tags: Seq[String]) => {
val data = Map(
"stmt" -> sql,
"message" -> "failure",
"timestamp" -> DateTimeUtils.currentIsoString,
"params" -> params.mkString("[", ",", "]"),
"error" -> e.getMessage)
slowQueryLog(data ++ getExtraProps(tags))
}
def flushSlowQueryLog = {
Future {
implicit val formats = org.json4s.DefaultFormats
if (queue.size > 0) {
val dtf = DateTimeFormat.forPattern("yyyy-MM")
val indexName = "platform-logs-" + dtf.print(DateTimeUtils.nowUtc)
val reqs = new util.ArrayList[Map[String, Any]]()
queue.drainTo(reqs)
val payload = new StringBuffer()
reqs.foreach { data =>
payload.append( s"""{ "index" : { "_index":"$indexName", "_type":"$SLOW_QUERY_DOCTYPE"}}""" + "\n")
payload.append(Serialization.write(data) + "\n")
}
payload.append("\n")
logger.debug("flushing slow query log to elasticsearch:\n" + payload.toString)
slowQueryESclient.bulk(Option(indexName), Option(SLOW_QUERY_DOCTYPE), payload.toString).onComplete {
case Failure(e) => logger.error(e.getMessage)
case Success(r) => logger.debug("ES response code:" + r.getStatusCode.toString)
}
}
}
}
def slowQueryLog(data: Map[String, Any]) = {
queue.put(data)
if (queue.size() >= SLOW_QUERY_BATCH_SIZE) flushSlowQueryLog
}

view raw

stuff.scala

hosted with ❤ by GitHub