Skip to content

Commit 66e1ea1

Browse files
committed
[Nu-8656] Abstract FlinkEngineRuntimeContext
1 parent 3ec9c2c commit 66e1ea1

File tree

14 files changed

+103
-26
lines changed

14 files changed

+103
-26
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package pl.touk.nussknacker.engine.flink.api
2+
3+
import org.apache.flink.api.common.TaskInfo
4+
import org.apache.flink.api.common.functions.RuntimeContext
5+
import org.apache.flink.api.connector.sink2.WriterInitContext
6+
import org.apache.flink.metrics.groups.OperatorMetricGroup
7+
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
8+
9+
sealed trait FlinkEngineContext
10+
final case class RuntimeCtx(ctx: RuntimeContext) extends FlinkEngineContext
11+
final case class WriterInitCtx(ctx: WriterInitContext) extends FlinkEngineContext
12+
13+
object FlinkEngineContextOps {
14+
15+
implicit class RichExceptionHandler(val f: FlinkEngineContext => ExceptionHandler) extends AnyVal {
16+
def narrowToRuntimeCtx: RuntimeContext => ExceptionHandler =
17+
rc => f(RuntimeCtx(rc))
18+
}
19+
20+
implicit class RichFlinkEngineContext(val ctx: FlinkEngineContext) extends AnyVal {
21+
22+
def getTaskInfo: TaskInfo = ctx match {
23+
case RuntimeCtx(ctx) => ctx.getTaskInfo
24+
case WriterInitCtx(ctx) => ctx.getTaskInfo
25+
}
26+
27+
def getMetricGroup: OperatorMetricGroup = ctx match {
28+
case RuntimeCtx(ctx) => ctx.getMetricGroup
29+
case WriterInitCtx(ctx) => ctx.metricGroup()
30+
}
31+
32+
}
33+
34+
}

engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/FlinkEngineRuntimeContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import org.apache.flink.api.common.functions.RuntimeContext
44
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
55

66
trait FlinkEngineRuntimeContext extends EngineRuntimeContext {
7-
def runtimeContext: RuntimeContext
7+
def runtimeContext: FlinkEngineContext
88
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package pl.touk.nussknacker.engine.flink.api.exception
2+
3+
import org.apache.flink.api.connector.sink2.SinkWriter
4+
5+
trait SinkWriterWithExceptionHandler[T] {
6+
self: SinkWriter[T] =>
7+
8+
protected val exceptionHandler: ExceptionHandler
9+
10+
override def close(): Unit = {
11+
self.close()
12+
if (exceptionHandler != null) {
13+
exceptionHandler.close()
14+
}
15+
}
16+
17+
}

engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomNodeContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext
77
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
88
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
99
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
10-
import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters
10+
import pl.touk.nussknacker.engine.flink.api.{FlinkEngineContext, NkGlobalParameters}
1111
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
1212
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
1313

@@ -21,7 +21,7 @@ case class FlinkCustomNodeContext(
2121
timeout: FiniteDuration,
2222
convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext,
2323
lazyParameterHelper: FlinkLazyParameterFunctionHelper,
24-
exceptionHandlerPreparer: RuntimeContext => ExceptionHandler,
24+
exceptionHandlerPreparer: FlinkEngineContext => ExceptionHandler,
2525
globalParameters: Option[NkGlobalParameters],
2626
validationContext: Either[ValidationContext, Map[String, ValidationContext]],
2727
componentUseContext: ComponentUseContext,

engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.apache.flink.util.Collector
1212
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, NodeId, ValueWithContext}
1313
import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo}
1414
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
15+
import pl.touk.nussknacker.engine.flink.api.FlinkEngineContextOps._
1516
import pl.touk.nussknacker.engine.flink.api.exception.{ExceptionHandler, WithExceptionHandler}
1617
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink}
1718
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
@@ -105,7 +106,7 @@ object EncodeAsTableTypeFunction {
105106
val alignedType = ToTableTypeSchemaBasedEncoder.alignTypingResult(valueReturnType, sinkRowType)
106107
val producedType = TypeInformationDetection.instance.forType[Row](alignedType)
107108
new EncodeAsTableTypeFunction(
108-
flinkNodeContext.exceptionHandlerPreparer,
109+
flinkNodeContext.exceptionHandlerPreparer.narrowToRuntimeCtx,
109110
flinkNodeContext.nodeId,
110111
sinkRowType,
111112
producedType

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ProcessPartFunction.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pl.touk.nussknacker.engine.process
22

33
import org.apache.flink.api.common.functions.{OpenContext, RichFunction}
4+
import pl.touk.nussknacker.engine.flink.api.RuntimeCtx
45
import pl.touk.nussknacker.engine.graph.node.NodeData
56
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
67
import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler
@@ -45,7 +46,7 @@ trait ExceptionHandlerFunction extends RichFunction {
4546

4647
override def open(openContext: OpenContext): Unit = {
4748
compilerData = compilerDataForClassloader(getRuntimeContext.getUserCodeClassLoader)
48-
exceptionHandler = compilerData.prepareExceptionHandler(getRuntimeContext)
49+
exceptionHandler = compilerData.prepareExceptionHandler(RuntimeCtx(getRuntimeContext))
4950
}
5051

5152
}

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import org.apache.flink.api.common.functions.RuntimeContext
44
import pl.touk.nussknacker.engine.RuntimeMode
55
import pl.touk.nussknacker.engine.api.{JobData, NodeId}
66
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, IncContextIdGenerator}
7-
import pl.touk.nussknacker.engine.flink.api.FlinkEngineRuntimeContext
7+
import pl.touk.nussknacker.engine.flink.api.{FlinkEngineContext, FlinkEngineRuntimeContext, RuntimeCtx}
8+
import pl.touk.nussknacker.engine.flink.api.FlinkEngineContextOps.RichFlinkEngineContext
89
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
910
import pl.touk.nussknacker.engine.util.metrics.MetricsProviderForScenario
1011

1112
case class FlinkEngineRuntimeContextImpl(
1213
jobData: JobData,
13-
runtimeContext: RuntimeContext,
14+
runtimeContext: FlinkEngineContext,
1415
metricsProvider: MetricsProviderForScenario
1516
) extends FlinkEngineRuntimeContext {
1617

@@ -28,7 +29,7 @@ object FlinkEngineRuntimeContextImpl {
2829
// This creates FlinkEngineRuntimeContextImpl with correct metricsProviderForScenario based on ComponentUseContextProvider
2930
def apply(
3031
jobData: JobData,
31-
runtimeContext: RuntimeContext,
32+
runtimeContext: FlinkEngineContext,
3233
runtimeMode: RuntimeMode
3334
): FlinkEngineRuntimeContextImpl = {
3435
val properMetricsProvider = createMetricsProvider(runtimeMode, runtimeContext)

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@ import cats.data.NonEmptyList
44
import com.codahale.metrics
55
import com.codahale.metrics.SlidingTimeWindowReservoir
66
import org.apache.flink
7-
import org.apache.flink.api.common.functions.RuntimeContext
87
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
98
import org.apache.flink.metrics.MetricGroup
9+
import pl.touk.nussknacker.engine.flink.api.FlinkEngineContext
10+
import pl.touk.nussknacker.engine.flink.api.FlinkEngineContextOps.RichFlinkEngineContext
1011
import pl.touk.nussknacker.engine.util.metrics._
1112

1213
import java.util.concurrent.TimeUnit
1314

14-
class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends BaseMetricsProviderForScenario {
15+
class FlinkMetricsProviderForScenario(exceptionHandlerContext: FlinkEngineContext)
16+
extends BaseMetricsProviderForScenario {
1517

1618
override def registerGauge[T](identifier: MetricIdentifier, value: Gauge[T]): Unit =
1719
gauge[T, flink.metrics.Gauge[T]](identifier.name, identifier.tags, () => value.getValue)
@@ -58,8 +60,9 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba
5860
private def tagMode(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = {
5961
val lastName = nameParts.last
6062
// all but last
61-
val metricNameParts = nameParts.init
62-
val groupWithNameParts = metricNameParts.foldLeft[MetricGroup](runtimeContext.getMetricGroup)(_.addGroup(_))
63+
val metricNameParts = nameParts.init
64+
val groupWithNameParts =
65+
metricNameParts.foldLeft[MetricGroup](exceptionHandlerContext.getMetricGroup)(_.addGroup(_))
6366

6467
val finalGroup = tags.toList.sortBy(_._1).foldLeft[MetricGroup](groupWithNameParts) {
6568
case (group, (tag, tagValue)) => group.addGroup(tag, tagValue)

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerData.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import cats.data._
44
import cats.data.Validated.{Invalid, Valid}
55
import org.apache.flink.api.common.functions.RuntimeContext
66
import org.apache.flink.api.common.restartstrategy.RestartStrategies
7+
import org.apache.flink.api.connector.sink2.WriterInitContext
78
import pl.touk.nussknacker.engine.{Interpreter, RuntimeMode, ScenarioCompilationDependencies}
89
import pl.touk.nussknacker.engine.api.JobData
910
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ScenarioCompilationErrors, ValidationContext}
@@ -18,6 +19,7 @@ import pl.touk.nussknacker.engine.compile.nodecompilation.{
1819
}
1920
import pl.touk.nussknacker.engine.compiledgraph.CompiledProcessParts
2021
import pl.touk.nussknacker.engine.compiledgraph.node.Node
22+
import pl.touk.nussknacker.engine.flink.api.{FlinkEngineContext, RuntimeCtx}
2123
import pl.touk.nussknacker.engine.graph.node.NodeData
2224
import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler
2325
import pl.touk.nussknacker.engine.splittedgraph.splittednode.SplittedNode
@@ -43,7 +45,7 @@ class FlinkProcessCompilerData(
4345
def open(runtimeContext: RuntimeContext, nodesToUse: List[_ <: NodeData]): Unit = {
4446
val lifecycle = compilerData.lifecycle(nodesToUse)
4547
lifecycle.foreach {
46-
_.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext, runtimeMode))
48+
_.open(FlinkEngineRuntimeContextImpl(jobData, RuntimeCtx(runtimeContext), runtimeMode))
4749
}
4850
}
4951

@@ -94,9 +96,9 @@ class FlinkProcessCompilerData(
9496

9597
def restartStrategy: RestartStrategies.RestartStrategyConfiguration = exceptionHandler.restartStrategy
9698

97-
def prepareExceptionHandler(runtimeContext: RuntimeContext): FlinkExceptionHandler = {
99+
def prepareExceptionHandler(flinkEngineContext: FlinkEngineContext): FlinkExceptionHandler = {
98100
exceptionHandler.open(
99-
FlinkEngineRuntimeContextImpl(jobData, runtimeContext, runtimeMode)
101+
FlinkEngineRuntimeContextImpl(jobData, flinkEngineContext, runtimeMode)
100102
)
101103
exceptionHandler
102104
}

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/MetricsProviderForFlink.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ package pl.touk.nussknacker.engine.process.compiler
22

33
import org.apache.flink.api.common.functions.RuntimeContext
44
import pl.touk.nussknacker.engine.RuntimeMode
5+
import pl.touk.nussknacker.engine.flink.api.FlinkEngineContext
56
import pl.touk.nussknacker.engine.util.metrics.{MetricsProviderForScenario, NoOpMetricsProviderForScenario}
67

78
object MetricsProviderForFlink {
89

910
def createMetricsProvider(
1011
runtimeMode: RuntimeMode,
11-
runtimeContext: RuntimeContext
12+
flinkEngineContext: FlinkEngineContext
1213
): MetricsProviderForScenario = {
1314
runtimeMode match {
1415
case RuntimeMode.Test => NoOpMetricsProviderForScenario
15-
case _ => new FlinkMetricsProviderForScenario(runtimeContext)
16+
case _ => new FlinkMetricsProviderForScenario(flinkEngineContext)
1617
}
1718
}
1819

0 commit comments

Comments
 (0)