Scala library API
Definitions:
Mist Function is a functional framework that defines particular Spark calculation. Mist Function is a deployable unit for Mist proxy.
Job - a Spark job triggered by Mist Function.
Mist Library provides a DSL for Mist Functions that could be deployed and executed in Mist.
MistFn
is a base interface for function definition.
PiExample.scala
:
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
object PiExample extends MistFn {
override def handle: Handle = {
withArgs(arg[Int]("samples")).onSparkContext((n: Int, sc: SparkContext) => {
val count = sc.parallelize(1 to n).filter(_ => {
val x = math.random
val y = math.random
x * x + y * y < 1
}).count()
val pi = (4.0 * count) / n
pi
}).asHandle
}
}
Build
Add Mist as dependency in your build.sbt
:
Notes: it’s required to have following spark modules at compile time:
spark-core
, spark-sql
, spark-hive
, spark-streaming
libraryDependencies ++= Seq(
"io.hydrosphere" %% "mist-lib" % "1.1.3",
"org.apache.spark" %% "spark-core" % "2.1.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.1.0" % "provided",
"org.apache.spark" %% "spark-hive" % "2.1.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.1.0" % "provided"
)
Maven dependency:
pom.xml
:
<properties>
<spark.version>2.2.0</spark.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.hydrosphere</groupId>
<artifactId>mist-lib_2.11</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
Overview
Speaking generally - MistFn
represents an interface that provides
function over one of available spark contexts (SparkContext, SQLContext, …, SparkSession).
Here A
is a function result type, that mist can automatically convert to json(see Encoders).
Arguments
Internally library DSL is based on ArgDef[A]
- its goal is to describe argument type and how to extract it from a request.
For example: arg[Int]("n")
means that request’s json object should has key n
with a value that can be converted to integer.
There are following basic methods do define an argument:
arg[T](name: String): ArgDef[A]
- required argument by namearg[A](name: String, default: A): ArgDef[A]
- if argument is missed, request function will fallback to default valueallArgs: ArgDef[Map[String, Any]]
- takes all arguments presented in request asMap[String, Any]
By default library supports following argument types:
Boolean
Int
Double
String
Seq[A]
(where A should be one from supported types)Option[A]
(where A should be one from supported types)
Method withArgs
accepts from 1 to 21 argument and returns ArgDef
.
val one = withArgs(arg[Int]("n"))
val two = withArgs(arg[Int]("n"), arg[String]("str"))
val three = withArgs(arg[Int]("n"), arg[String]("str"), arg[Boolean]("flag"))
Or you could use &
/combine
to get the same result
val one = arg[Int]("n")
val two = arg[Int]("n") combine arg[String]("str")
val three = arg[Int]("n") & arg[String]("str") & arg[Boolean]("flag")
Also it’s possible to use case classes as an argument:
import mist.api._
import mist.api.dsl._
import mist.api.encoding
import mist.api.encoding._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
case class Foo(a: Int, b: String)
object ComplexRootArgFn extends MistFn {
implicit val fooExt: RootExtractor[Foo] = encoding.generic.extractor[Foo]
// expected input is json:
// {
// "a": 42,
// "b": "some_str"
// }
override def handle: Handle = {
withArgs(arg[Foo]).onSparkContext((foo: Foo, sc: SparkContext) => 42 ).asHandle
}
}
More examples:
import mist.api._
// for primitives and collections
import mist.api.encoding
import mist.api.encoding._
import mist.api.encoding.defaults._
case class Bar(a: Int, b: String)
case class Foo(x: Int, bars: Seq[Bar])
// to derive an extractor for `Foo` we need to also have it for `Bar`
implicit val barExt: JsExtractor[Bar] = encoding.generic.extractor[Bar]
implicit val fooExt: RootExtractor[Foo] = encoding.generic.extractor[Foo]
// define root arg that expects following input:
// {
// "x": 42,
// "bars": [
// {"a": 1, "b": "str"},
// {"a": 2, "b": "str2"}
// ]
// }
val rootArg = arg[Foo]
// define just complex arg
// {
// "foo": {
// "x": 42,
// "bars": [
// {"a": 1, "b": "str"},
// {"a": 2, "b": "str2"}
// ]
// }
// }
val fieldArg = arg[Foo]("foo")
If you want to allow to skip field with default values:
import mist.api._
import mist.api.encoding
import mist.api.encoding._
import mist.api.encoding.defaults._
case class Bar(a: Int, b: String = "default")
implicit val barExt: JsExtractor[Bar] = encoding.generic.extractorWithDefaults[Bar]
case class Foo(x: Int, bar: Bar = Bar(1, "str"))
// JsEncoder for bar is required
implicit val barEnc: JsEncoder[Bar] = encoding.generic.encoder[Bar]
implicit val fooExt: RootExtractor[Foo] = encoding.generic.extractorWithDefaults[Foo]
Contexts
Next to complete Mist Function definition we should inject Spark Context.
Mist provides managed Spark Contexts, so developer does not care about context’s lifecycle and settings.
There are following methods of ArgDef
to define Spark Context:
onSparkContext
onSparkSession
onStreamingContext
onSqlContext
onHiveContext
It accepts function with n+1
arguments where n
is count of combined argument plus SparkContext
at end.
val fromOne = withArgs(arg[Int]("n")).onSparkContext((n: Int, sc: SparkContext) => { ... })
val fromTwo = withArgs(arg[Int]("n"), arg[String]("str")).onSparkContext((n: Int, s: String, sc: SparkContext) => { ... })
val fromThree = withArgs(arg[Int]("n"), arg[String]("str"), arg[Boolean]("flag"))
.onSparkContext((n: Int, s: String, b: Boolean, sc: SparkContext) => { ... })
If your function doesn’t require any arguments, there are similar methods available from MistFn
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
object NoArgsFn extends MistFn {
override def handle: Handle = {
onSparkContext((sc: SparkContext) => 42 ).asHandle
}
}
RawHandle / Handle
Mist should be able to return result back to the client (http request, async interfaces) and it requires
that result should be serialized to json.
After we define argument and function body we receive a RawHandle[A]
and then we should call
asHandle
method on it to turn it into Handle
.
So there is not much difference between them:
- RawHandle[A] represents a function
Json + SparkContext => A
- Handle represents a function
Json + SparkContext => Json
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
object MyFn extends MistFn {
override def handle: Handle = {
val rawHandle: RawHandle[Int] = onSparkContext((sc: SparkContext) => 42 )
rawHandle.asHandle
}
}
Encoding
Json encoding is based on mist.api.JsEncoder[A]
and mist.api.data.JsData
(json ast)
There are implementations for common result types:
import mist.api.encoding.defaults._
It supports:
- Unit
- primitives: Short, Int, Long, Float, String, Double, Boolean
- collections: Array, Seq, Map
- experimental: DataFrame, DataSet - Warning - return them only if you sure that they are small, otherwise it will lead to
OutOfMemory
import mist.api.encoding.spark._
Also if you need you could define it by your self:
import mist.api._
import mist.api.data._
import mist.api.dsl._
class Foo(val a: Int, val b: String)
object MyFn extends MistFn {
implicit val fooEnc: JsEncoder[Foo] = JsEncoder(foo => JsMap("a" -> JsNumber(foo.a), "b" -> JsString("b")))
override def handle: Handle = {
val rawHandle: RawHandle[Foo] = onSparkContext((sc: SparkContext) => new Foo(42, "str"))
rawHandle.asHandle
}
}
For more convenient work with JsData
there is JsSyntax
:
import mist.api.data._
import mist.api.encoding.JsSyntax._
import mist.api.encoding.JsEncoder
class Foo(val a: Int, val b: String)
val fooEnc: JsEncoder[Foo] = JsEncoder(foo => JsMap("a" -> foo.a.js, "b" -> foo.b.js))
Also it’s possible to automatically derive an encoder for case classes:
import mist.api._
import mist.api.encoding
import mist.api.encoding._
import mist.api.encoding.defaults._
case class Bar(a: Int, b: String)
case class Foo(x: Int, foos: Seq[Bar])
implicit val barEnc: JsEncoder[Bar] = encoding.generic.encoder[Bar]
implicit val fooEnc: JsEncoder[Foo] = encoding.generic.encoder[Foo]
Validation
For example for calculating pi using dartboard method n should be at least positive number.
For that purpose ArgDef[A]
has special methods to validate arguments:
validated(f: A => Boolean)
validated(f: A => Boolean, explanation: String)
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
object PiExample extends MistFn {
override def handle: Handle = {
withArgs(
arg[Int]("samples").validated(n => n > 0, "Samples value should be positive")
).onSparkContext((n: Int, sc: SparkContext) => {
val count = sc.parallelize(1 to n).filter(_ => {
val x = math.random
val y = math.random
x * x + y * y < 1
}).count()
val pi = (4.0 * count) / n
pi
}).asHandle
}
}
Mist extras and logging
Every function invocation on Mist has unique id and associated worker. It could be useful in some cases
to have that extra information in a function body.
Also, to be able to log and see what’s going on on job side from mist-ui you could just use org.slf4j.Logger
or use mist.api.Logging
mixin:
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext
object HelloWorld extends MistFn with Logging {
override def handle: Handle = {
withArgs(arg[Int]("samples"))
.withMistExtras
.onSparkContext((n: Int, extras: MistExtras, sc: SparkContext) => {
import extras._
logger.info(s"Hello from $jobId")
}).asHandle
}
}
Passing function into spark-submit
MisFn
trait has a default main
implementation, so it’s possible to pass mist functions into spark-submit
directly.
- only ensure that
mist-lib
is will be presented in classpath.
Testing
If you want to unit-test your function there is an example