Scala scripting ?!

So I have a lot of little data fixing up scripts I have to write to maintain our large and growing elasticsearch database.  Typically, I use shell stack (bash/jq/curl) to do such tasks.  When things get a little bigger I switch to python.  I’m a big fan of jq/curl but for anything that isn’t small, it gets nasty.  I’m a big fan of python too, but I’m still a baby using it, so when things get bigger or more complicated, I’m not very efficient.

Anyway, a lot of these tasks end up doing scroll/scan or search in ES, and then feed data back in, or to message queues, or to various endpoints to do work.  They are often long lived.  Code them up, throw them in rundeck, and let them run for hours or days.

One frustration is that it doesn’t go fast enough in simply queueing the work, my storm jobs which are doing the processing, go way faster than the input to them, when I use these scripting options.  I know I could learn how to do async in python, or try to shard things up in shell stack and use ‘parallel’, or find some other solutions.  But since I already have a lot of business logic in my runtime code in Scala, it would be nice to just use that, but without the headache of builds and deployments, something faster and lighter weight, I can still just dump into rundeck.  I know how to control async, threads, and concurrency in this environment, and I know I’m not limited by the toolset.

I looked into this once before but gave up at the dependency part of it.  Then I discovered this blog.

Basically using a hacked up version of sbt, you can write a single file script in scala that will, download dependencies, compile, and launch, with no fus.  I’ll show you how I got it hooked up, mostly followed that blog, and a few others things I found helpful.  Here’s the steps I followed to get it setup on my rundeck linux box:

Install conscript

curl https://raw.githubusercontent.com/foundweekends/conscript/master/setup.sh | sh

Install the sbt runner:

cs sbt/sbt --branch 0.13

Put scalas in your path:

export CONSCRIPT_HOME="$HOME/.conscript"
export CONSCRIPT_OPTS="-Dfile.encoding=UTF-8"
export PATH=$CONSCRIPT_HOME/bin:$PATH

Create a script:


#!/usr/bin/env scalas
/***
scalaVersion := "2.11.7"
*/
println("hello")

$ chmod +x script.scala 
$ ./script.scala
hello

Ok, so now you can add in the location or your artifacts and dependencies inside the /*** comment like this:

/***
scalaVersion := "2.11.7"
 
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns)

resolvers += "Your Repo" at "http://artifactory.yourplace.com/artifactory/repo"

resolvers += Resolver.mavenLocal

libraryDependencies += "org.scala-sbt" % "io" % "0.13.11"

libraryDependencies += "ch.qos.logback" %  "logback-classic" % "1.1.7"

libraryDependencies += "ch.qos.logback" %  "logback-core" % "1.1.7"

libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21"

libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader"  % "1.3.43"

libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7"
*/

When you run it, it will automatically download dependencies, compile, and run your script.  The internet downloads to here:  $CONSCRIPT_HOME/boot

You can also bring in a logger, control it programmatically.  It starts to look a lot like the python script, at least in regards to how simple it is to configure logging in python.  But with access to all your fancy Scala tools.  When was the last time you could do tail recursion in a script and not worry about a stack overflow ?  @tailrec to the rescue!


#!/usr/bin/env scalas
/***
scalaVersion := "2.11.7"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns)
resolvers += "Your Artifactory" at "http://yourstuff.com/artifactory/repo"
resolvers += "mandubian maven bintray" at "http://dl.bintray.com/mandubian/maven"
resolvers += Resolver.mavenLocal
libraryDependencies += "org.scala-sbt" % "io" % "0.13.11"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.7"
libraryDependencies += "ch.qos.logback" % "logback-core" % "1.1.7"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21"
libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader" % "1.3.43"
libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7"
*/
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ElasticClient, SearchType}
import com.trax.common.log.LogbackUtils
import com.trax.elasticsearch.EsClient
import com.trax.platform.util.scala.RestUtils
import com.typesafe.scalalogging.{Logger, LazyLogging}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
LogbackUtils.setLogLevel(null, "INFO")
val logger = Logger(LoggerFactory.getLogger("log"))
@tailrec
def next(client: ElasticClient, scrollId: String, total:Int): Int = {
val resp = client.execute { search scroll scrollId keepAlive "1m" }.await
if (resp.getHits.getHits.isEmpty) total
else {
val futures = resp.getHits.getHits.map { hit =>
val msg = compact(render(("env_id" -> hit.field("Environment.EnvId").getValues.head.toString) ~ ("action" -> "index") ~ ("type" -> "fb") ~ ("id" -> hit.id)))
logger.debug(msg)
Future {
RestUtils.postRestContent("http://someserver/sendmessage/invoice_search", msg, "application/json")
}
}.toSeq
Await.result(Future.sequence(futures), 1.hour)
next(client, resp.getScrollId, total + futures.size)
}
}
val client = EsClient.getInstance("somecluster",9300,"cluster_name").client
val resp = client.execute {
search in "fps-fbs*" / "fb" size(1000) fields("FbNorm.Businessflow","Environment.EnvId") query wildcardQuery("FbNorm.Businessflow","*unknown") searchType SearchType.Scan scroll "1m"
}.await
logger.info(s"${resp.getHits.getTotalHits} hits")
val docs = next(client, resp.getScrollId, 0)
logger.info(s"fixed ${docs} docs")

Final Thoughts

Scripting with Scala is not good for everything, or everyone.  The compile time stinks when you have a small task, but if it’s going to run for hours or days, and run considerably faster because you can do it concurrently, the few extra seconds to compile is worth it.

Also if you have business logic wrapped up in the java ecosystem already, you may find it an easy way to unlock some stuff quickly without having to put things behind a Rest interface or message queue, or what have you.

It also might be an easy way to explore Scala for some real tasks.  So if you are curious and want to dabble around with it a bit, without having to bet the farm on some new tech that nobody knows or is willing to invest in heavily, give it a go.

LogbackUtils

Here’s the body of the little log util I use for controlling logback without any config files. Something I found on stack overflow and its good enough for command line and simple scripts.  Don’t get me started on how logging sucks in the Java ecosystem, many a day wasted attempting to do things that should be easy…


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
/**
* Contains methods to access and manipulate logback framework dynamically at run-time. Here 'dynamically' means without
* referencing the logback JAR, but using it if found in the classpath.
*
* @author Muhammed Demirbas
* @since 19 Mar 2013
*/
public final class LogbackUtils
{
public static final String LOGBACK_CLASSIC = "ch.qos.logback.classic";
public static final String LOGBACK_CLASSIC_LOGGER = "ch.qos.logback.classic.Logger";
public static final String LOGBACK_CLASSIC_LEVEL = "ch.qos.logback.classic.Level";
private static final Logger logger = LoggerFactory.getLogger(LogbackUtils.class);
private LogbackUtils()
{
// Prevent instance creation
}
/**
* Dynamically sets the logback log level for the given class to the specified level.
*
* @param loggerName Name of the logger to set its log level. If blank, root logger will be used.
* @param logLevel One of the supported log levels: TRACE, DEBUG, INFO, WARN, ERROR, FATAL,
* OFF. {@code null} value is considered as 'OFF'.
*/
public static boolean setLogLevel(String loggerName, String logLevel)
{
String logLevelUpper = (logLevel == null) ? "OFF" : logLevel.toUpperCase();
try
{
Package logbackPackage = Package.getPackage(LOGBACK_CLASSIC);
if (logbackPackage == null)
{
logger.info("Logback is not in the classpath!");
return false;
}
// Use ROOT logger if given logger name is blank.
if ((loggerName == null) || loggerName.trim().isEmpty())
{
loggerName = (String) getFieldVaulue(LOGBACK_CLASSIC_LOGGER, "ROOT_LOGGER_NAME");
}
// Obtain logger by the name
Logger loggerObtained = LoggerFactory.getLogger(loggerName);
if (loggerObtained == null)
{
// I don't know if this case occurs
logger.warn("No logger for the name: {}", loggerName);
return false;
}
Object logLevelObj = getFieldVaulue(LOGBACK_CLASSIC_LEVEL, logLevelUpper);
if (logLevelObj == null)
{
logger.warn("No such log level: {}", logLevelUpper);
return false;
}
Class<?>[] paramTypes = { logLevelObj.getClass() };
Object[] params = { logLevelObj };
Class<?> clz = Class.forName(LOGBACK_CLASSIC_LOGGER);
Method method = clz.getMethod("setLevel", paramTypes);
method.invoke(loggerObtained, params);
logger.debug("Log level set to {} for the logger '{}'", logLevelUpper, loggerName);
return true;
}
catch (Exception e)
{
logger.warn("Couldn't set log level to {} for the logger '{}'", logLevelUpper, loggerName, e);
return false;
}
}
private static Object getFieldVaulue(String fullClassName, String fieldName)
{
try
{
Class<?> clazz = Class.forName(fullClassName);
Field field = clazz.getField(fieldName);
return field.get(null);
}
catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | NoSuchFieldException |
SecurityException ignored)
{
return null;
}
}
}

Leave a comment