Java library API
Definitions:
Mist Function is a functional framework that defines particular Spark calculation. Mist Function is a deployable unit for Mist proxy.
Job - a Spark job triggered by Mist Function.
Mist Library provides a DSL for Mist Functions that could be deployed and executed in Mist.
JMistFn
is a base interface for function definition.
JavaPiExample.java
:
import static mist.api.jdsl.Jdsl.*;
import mist.api.Handle;
import mist.api.MistFn;
import mist.api.jdsl.JEncoders;
import java.util.ArrayList;
import java.util.List;
public class JavaPiExample extends MistFn {
@Override
public Handle handle() {
return withArgs(intArg("samples")).onSparkContext((n, sc) -> {
List<Integer> l = new ArrayList<>(n);
for (int i = 0; i < n ; i++) {
l.add(i);
}
long count = sc.parallelize(l).filter(i -> {
double x = Math.random();
double y = Math.random();
return x*x + y*y < 1;
}).count();
double pi = (4 * count) / n;
return pi;
}).toHandle(JEncoders.doubleEncoder());
}
}
Build
Maven dependency:
pom.xml
:
<properties>
<spark.version>2.2.0</spark.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.hydrosphere</groupId>
<artifactId>mist-lib_2.11</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
Overview
Speaking generally - JMistFn
represents an interface that provides
function over one of available spark contexts (SparkContext, SparkSession, StreamingContext).
Arguments
Internally library dsl is based on JArg[A]
- its goal is to describe argument type and how to extract it from a request.
So for example: intArg("n")
means that request’s json object should has key n
with a value that can be converted to integer.
There are following basic methods do define an argument (they have similar names for different types: intArg/stringArg/doubleArg/booleanArg).
For example for integer:
intArg(String name)
- required argument by nameintArg(String name, int defaultValue)
- if argument is missed, function will fallback to default valueoptIntArg(String name)
- function will receiveOptional<Integer>
intListArg(String name)
- function will receiveList<Integer>
Method withArgs
accepts from 1 to 21 argument and returns ArgsN
.
Args1<Integer> one = withArgs(intArg("n"))
Args2<scala.Tuple2<Integer, String>> two = withArgs(intArg("n"), stringArg("str"))
Args3<scala.Tuple3<Iteger, String, Boolean>> three = withArgs(intArg("n"), stringArg"str"), booleanArg("flag"))
Contexts
Next to complete Mist Function definition we should inject Spark Context.
Mist provides managed Spark Contexts, so developer does not care about context’s lifecycle and settings.
There are following methods of ArgsN
to define Spark Context:
onSparkContext
onSparkSession
onSparkSessionWithHive
onStreamingContext
It accepts function with n+1
arguments where n
is count of combined argument plus SparkContext
at end.
RawHandle<?> fromOne = withArgs(intArg("n")).onSparkContext((n, sc) -> { ... })
RawHandle<?> fromTwo = withArgs(intArg("n"), stringArg("str")).onSparkContext((n, s, sc) -> { ... })
RawHandle<?> fromThree = withArgs(intArg("n"), stringArg("str"), booleanArg("flag"))
.onSparkContext((n, s, b, sc) -> { ... })
If your job doesn’t require any arguments, there are similar methods available from JMistFn
import static mist.api.jdsl.Jdsl.*;
import mist.api.Handle;
import mist.api.MistFn;
import mist.api.jdsl.JEncoders;
import java.util.ArrayList;
import java.util.List;
public class NoArgsExample extends MistFn {
@Override
public Handle handle() {
return onSparkContext((sc) -> 42).toHandle(JEncoders.intEncoder());
}
}
Validation
For example for calculating pi using dartboard method n should be at least positive number.
For that purpose JArg<A>
has special methods to validate arguments:
validated(f: A => Boolean)
validated(f: A => Boolean, explanation: String)
import static mist.api.jdsl.Jdsl.*;
import mist.api.Handle;
import mist.api.MistFn;
import mist.api.encoding.JsEncoder;
import mist.api.jdsl.JEncoders;
import java.util.ArrayList;
import java.util.List;
public class JavaPiExample extends MistFn {
@Override
public Handle handle() {
JArg<Integer> samples = intArg("samples").validated(s -> s > 0, "Samples must be positive");
return withArgs(samples).onSparkContext((n, sc) -> {
...
});
}
}
RawHandle / Handle
Mist should be able to return result back to the client (http request, async interfaces) and it requires
that result should be serialized to json.
After we define argument and function body we receive a RawHandle[A]
and then we should call
asHandle
method on it to turn it into Handle
.
So there is not much difference between them:
- RawHandle[A] represents a function
Json + SparkContext => A
- Handle represents a function
Json + SparkContext => Json
Encoding
Json encoding is based on mist.api.JsEncoder[A]
and mist.api.data.JsData
(json ast)
There are implementations for common result types:
import mist.api.jdsl.JEncoders;
It supports:
- Void
- primitives: Short, Int, Long, Float, String, Double, Boolean
- collections: List, Map
Also if you need you could define it by your self:
import static mist.api.jdsl.Jdsl.*;
import mist.api.Handle;
import mist.api.MistFn;
import mist.api.data.*;
import mist.api.encoding.JsEncoder;
import mist.api.jdsl.JEncoders;
class Foo {
public Int a;
public String b;
public Foo (int a, String b) {
this.a = a;
this.b = b;
}
}
class FooEncoder implements JsEncoder[Foo] {
@Override
public JsData apply(Foo foo) {
return JsMap.empty()
.addField("a", JsNumber.of(foo.a))
.addField("b", JsString.of(foo.b))
}
}
class MyFn extends MistFn {
@Override
public Handle handle() {
RawHandle<Foo> raw = onSparkContext((sc) -> new Foo(42, "str"));
raw.toHandle(new FooEncoder());
}
}
Mist extras and logging
Every function invocation on Mist has unique id and associated worker. It could be useful in some cases
to have that extra information in a function body.
Also, to be able to log and see what’s going on on job side from mist-ui you could just use org.slf4j.Logger
or use mist.api.Logging
mixin:
import static mist.api.jdsl.Jdsl.*;
import mist.api.Handle;
import mist.api.MistFn;
import mist.api.data.*;
import mist.api.encoding.JsEncoder;
import mist.api.jdsl.JEncoders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Hello extends MistFn {
Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Handle handle() {
return withArgs(intArg("samples")).withMistExtras().onSparkContext((n, extras, sc) -> {
logger.info("Hello from job:" + extras.jobId());
return (Void) null;
}).toHandle(JEncoders.empty());
}
}
Passing function into spark-submit
MisFn
trait has a default main
implementation, so it’s possible to pass mist functions into spark-submit
directly.
- only ensure that
mist-lib
is will be presented in classpath.
Testing
If you want to unit-test your function there is an example