Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/examples-python/simple_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding=utf-8
import time

from mist.mist_job import MistJob

Expand All @@ -7,5 +8,5 @@ class SimpleContext(MistJob):
def execute(self, numbers, multiplier = 2):
rdd = self.context.parallelize(numbers)
result = rdd.map(lambda s: s * multiplier).collect()

time.sleep(60)
return {"result": result}
15 changes: 8 additions & 7 deletions examples/examples-python/simple_streaming.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from mist.mist_job import *

class SimpleStreaming(MistJob, WithStreamingContext, WithPublisher):
class SimpleStreaming(MistJob, WithStreamingContext):

def execute(self, parameters):
def execute(self):
import time

def takeAndPublish(time, rdd):
taken = rdd.take(11)
self.publisher.publish("-------------------------------------------")
self.publisher.publish("Time: %s" % time)
self.publisher.publish("-------------------------------------------")
self.publisher.publish(str(taken))
print(taken)

ssc = self.streaming_context
type(ssc)

log4jLogger = ssc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("Hello!")

rddQueue = []
for i in range(500):
rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
Expand Down
3 changes: 2 additions & 1 deletion mist.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ lazy val root = project.in(file("."))

val mkPyfunctions = Seq(
("simple_context.py", "SimpleContext"),
("session_job.py", "SessionJob")
("session_job.py", "SessionJob"),
("simple_streaming.py", "SimpleStreaming")
).flatMap({case (file, clazz) => {
val name = file.replace(".py", "")
Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.hydrosphere.mist.master.execution

import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props, Timers}
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Timers}
import io.hydrosphere.mist.core.CommonData._
import io.hydrosphere.mist.master.Messages.StatusMessages._
import io.hydrosphere.mist.master.execution.status.StatusReporter
Expand All @@ -27,6 +27,8 @@ class JobActor(

import JobActor._

override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

override def preStart(): Unit = {
report.reportPlain(QueuedEvent(req.id))
req.timeout match {
Expand All @@ -39,10 +41,9 @@ class JobActor(
override def receive: Receive = initial

private def initial: Receive = {
case Event.Cancel =>
cancelFinally("user request", Seq(sender()), None)
case Event.Cancel => cancelNotStarted("user request", Some(sender()))
case Event.Timeout => cancelNotStarted("timeout", None)

case Event.Timeout => cancelFinally("timeout", Seq.empty, None)
case Event.GetStatus => sender() ! ExecStatus.Queued

case Event.Perform(connection) =>
Expand Down Expand Up @@ -99,30 +100,46 @@ class JobActor(
cancelRespond.foreach(_ ! msg)
onConnectionTermination(Some(connection))

case ev @ JobIsCancelled(_, _) =>
cancelFinally(reason, cancelRespond, Some(connection))
case ev @ JobIsCancelled(_, time) =>
report.reportPlain(CanceledEvent(req.id, time))

case JobSuccess(_, data) =>
val msg = akka.actor.Status.Failure(new IllegalStateException(s"Job ${req.id} was completed"))
cancelRespond.foreach(_ ! msg)
completeSuccess(data, connection)

case JobFailure(_, err) =>
val msg = akka.actor.Status.Failure(new IllegalStateException(s"Job ${req.id} was completed"))
cancelRespond.foreach(_ ! msg)
completeFailure(err, Some(connection))
cancelStarted(reason, cancelRespond, Some(connection), err)
}

private def cancelFinally(
private def cancelNotStarted(reason: String, respond: Option[ActorRef]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global

val time = System.currentTimeMillis()
promise.failure(new RuntimeException(s"Job was cancelled: $reason"))
report.reportWithFlushCallback(CanceledEvent(req.id, time)).onComplete(t => {
val response = t match {
case Success(d) => ContextEvent.JobCancelledResponse(req.id, d)
case Failure(e) => akka.actor.Status.Failure(e)
}
respond.foreach(_ ! response)
})
log.info(s"Job ${req.id} was cancelled: $reason")
callback ! Event.Completed(req.id)
context.stop(self)
}

private def cancelStarted(
reason: String,
respond: Seq[ActorRef],
maybeConn: Option[WorkerConnection]
maybeConn: Option[WorkerConnection],
error: String
): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global

val time = System.currentTimeMillis()
promise.failure(new RuntimeException(s"Job was cancelled: $reason"))
report.reportWithFlushCallback(CanceledEvent(req.id, time)).onComplete(t => {
report.reportWithFlushCallback(FailedEvent(req.id, time, error)).onComplete(t => {
val response = t match {
case Success(d) => ContextEvent.JobCancelledResponse(req.id, d)
case Failure(e) => akka.actor.Status.Failure(e)
Expand All @@ -132,7 +149,7 @@ class JobActor(
log.info(s"Job ${req.id} was cancelled: $reason")
maybeConn.foreach(_.release())
callback ! Event.Completed(req.id)
self ! PoisonPill
context.stop(self)
}

private def onConnectionTermination(maybeConn: Option[WorkerConnection]): Unit =
Expand All @@ -144,7 +161,7 @@ class JobActor(
log.info(s"Job ${req.id} completed successfully")
callback ! Event.Completed(req.id)
connection.release()
self ! PoisonPill
context.stop(self)
}

private def completeFailure(err: String, maybeConn: Option[WorkerConnection]): Unit = {
Expand All @@ -153,7 +170,7 @@ class JobActor(
log.info(s"Job ${req.id} completed with error")
maybeConn.foreach(_.release())
callback ! Event.Completed(req.id)
self ! PoisonPill
context.stop(self)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,13 @@ object JobStatusFlusher {
case InitializedEvent(_, _, _) => d
case QueuedEvent(_) => d.withStatus(Status.Queued)
case StartedEvent(_, time) => d.withStartTime(time).withStatus(Status.Started)
case CanceledEvent(_, time) => d.withEndTime(time).withStatus(Status.Canceled)
case CanceledEvent(_, time) => d.withStatus(Status.Canceled)
case JobFileDownloadingEvent(_, _) => d.withStatus(Status.FileDownloading)
case FinishedEvent(_, time, result) =>
d.withEndTime(time).withJobResult(result).withStatus(Status.Finished)
case FailedEvent(_, time, error) =>
if (d.status == Status.Canceled)
d
else
d.withEndTime(time).withStatus(Status.Failed).withFailure(error)
val status = if (d.status == Status.Canceled) d else d.withStatus(Status.Failed)
status.withEndTime(time).withFailure(error)
case WorkerAssigned(_, workerId) => d.copy(workerId = Some(workerId))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class JobActorSpec extends ActorSpec("worker-conn") with TestData with TestUtils

connectionRef.expectMsgType[CancelJobRequest]
connectionRef.send(actor, JobIsCancelled("id"))
connectionRef.send(actor, JobFailure("id", "error"))

probe.expectMsgType[ContextEvent.JobCancelledResponse]

Expand All @@ -168,7 +169,8 @@ class JobActorSpec extends ActorSpec("worker-conn") with TestData with TestUtils
classOf[QueuedEvent],
classOf[WorkerAssigned],
classOf[StartedEvent],
classOf[CanceledEvent]
classOf[CanceledEvent],
classOf[FailedEvent]
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class JobStatusFlusherSpec extends ActorSpec("job-status-flusher") with TestData
("event", "details"),
(QueuedEvent("id"), baseDetails.copy(status = Status.Queued)),
(StartedEvent("id", 1), baseDetails.copy(status = Status.Started, startTime = Some(1))),
(CanceledEvent("id", 1), baseDetails.copy(status = Status.Canceled, endTime = Some(1))),
(CanceledEvent("id", 1), baseDetails.copy(status = Status.Canceled)),
(FinishedEvent("id", 1, JsLikeMap("1" -> JsLikeNumber(2))),
baseDetails.copy(
status = Status.Finished,
Expand All @@ -74,10 +74,10 @@ class JobStatusFlusherSpec extends ActorSpec("job-status-flusher") with TestData
}
}

it("should ignore failure if job is canceled") {
it("shouldn't chang cancelled status by failure event") {
val canceled = baseDetails.copy(status = Status.Canceled)
val event = FailedEvent("id", 1, "error")
JobStatusFlusher.applyStatusEvent(canceled, event) shouldBe canceled
JobStatusFlusher.applyStatusEvent(canceled, event).status shouldBe Status.Canceled
}
}
}
3 changes: 0 additions & 3 deletions mist/worker/src/main/resources/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ def to_python_types(any):
if issubclass(class_, WithHiveSupport):
context_wrapper.set_hive_context(_gateway)

if issubclass(class_, WithPublisher):
context_wrapper.init_publisher(_gateway)

if issubclass(class_, WithStreamingContext):
context_wrapper.set_streaming_context(_gateway)

Expand Down
9 changes: 1 addition & 8 deletions mist/worker/src/main/resources/mist/context_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,8 @@ def set_hive_session(self, java_gateway):

def set_streaming_context(self, java_gateway):
from pyspark.streaming import StreamingContext
self._streaming_context = StreamingContext(self._context, java_gateway.entry_point.sparkStreamingWrapper().getDurationSeconds())
java_gateway.entry_point.sparkStreamingWrapper().setStreamingContext(self._streaming_context._jssc)

def init_publisher(self, java_gateway):
spark_context_wrapper = java_gateway.entry_point.sparkContextWrapper()
wrapper = java_gateway.entry_point.globalPublisherWrapper()
conf = spark_context_wrapper.setupConfiguration()
self._publisher = wrapper.create(conf)

self._streaming_context = StreamingContext(self._context, java_gateway.entry_point.sparkStreamingWrapper().getDurationSeconds())

@property
def context(self):
Expand Down
9 changes: 0 additions & 9 deletions mist/worker/src/main/resources/mist/mist_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ def setup(self, context_wrapper):
except ImportError:
self.hive_context = context_wrapper.hive_context

class WithPublisher(ContextSupport):
__metaclass__ = ABCMeta

publisher = None

@abstractmethod
def setup(self, context_wrapper):
self.publisher = context_wrapper.publisher

class WithStreamingContext(ContextSupport):
__metaclass__ = ABCMeta

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.hydrosphere.mist.worker

import scala.concurrent.{Future, Promise}

trait CancellableFuture[A] {
def future: Future[A]
def cancel(): Unit
}

object CancellableFuture {

def onDetachedThread[A](f: => A): CancellableFuture[A] = {
val ps = Promise[A]
val runnable = new Runnable {
override def run(): Unit = {
try {
ps.success(f)
} catch {
case e: InterruptedException => ps.failure(new RuntimeException("Execution was cancelled"))
case e: Throwable => ps.failure(e)
}
}
}
val thread = new Thread(runnable)
thread.setDaemon(true)
thread.start()

new CancellableFuture[A] {
override def cancel(): Unit = thread.interrupt()
override def future: Future[A] = ps.future
}
}
}

Loading