Skip to content

Commit 3041d0c

Browse files
authored
fix(dataflow): wait for kafka topic creation (#5375)
* fix(dataflow) wait for kafka topic creation Kafka topic creation happens asynchronously. This means that even when the return value from `createTopics(...)` indicates that the topic has been created successfully, the topic can not be immediately subscribed to. Instead of verifying the status of the topic from the `createTopics` return value, here we're repeatedly calling `describeTopics` until all of the topics for the pipeline can be described successfully. This indicates that the topic has been fully created _at least_ on one broker, and can now be subscribed to. **Fixed issues**: - Fixes dataflow component for #INFRA-663 (internal): Pipeline creation goes into ERROR state
1 parent 9b99743 commit 3041d0c

File tree

8 files changed

+93
-29
lines changed

8 files changed

+93
-29
lines changed

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ object Cli {
4747
val brokerSecret = Key("kafka.tls.broker.secret", stringType)
4848
val endpointIdentificationAlgorithm = Key("kafka.tls.endpoint.identification.algorithm", stringType)
4949

50+
// Kafka waiting for topic creation
51+
val topicCreateTimeoutMillis = Key("topic.create.timeout.millis", intType)
52+
val topicDescribeTimeoutMillis = Key("topic.describe.timeout.millis", longType)
53+
val topicDescribeRetries = Key("topic.describe.retry.attempts", intType)
54+
val topicDescribeRetryDelayMillis = Key("topic.describe.retry.delay.millis", longType)
55+
5056
// Kafka SASL
5157
val saslUsername = Key("kafka.sasl.username", stringType)
5258
val saslSecret = Key("kafka.sasl.secret", stringType)
@@ -75,6 +81,10 @@ object Cli {
7581
clientSecret,
7682
brokerSecret,
7783
endpointIdentificationAlgorithm,
84+
topicCreateTimeoutMillis,
85+
topicDescribeTimeoutMillis,
86+
topicDescribeRetries,
87+
topicDescribeRetryDelayMillis,
7888
saslUsername,
7989
saslSecret,
8090
saslPasswordPath,

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,19 @@ object Main {
8181
useCleanState = config[Cli.kafkaUseCleanState],
8282
joinWindowMillis = config[Cli.kafkaJoinWindowMillis],
8383
)
84+
val topicWaitRetryParams = TopicWaitRetryParams(
85+
createTimeoutMillis = config[Cli.topicCreateTimeoutMillis],
86+
describeTimeoutMillis = config[Cli.topicDescribeTimeoutMillis],
87+
describeRetries = config[Cli.topicDescribeRetries],
88+
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis]
89+
)
8490
val subscriber = PipelineSubscriber(
8591
"seldon-dataflow-engine",
8692
kafkaProperties,
8793
kafkaAdminProperties,
8894
kafkaStreamsParams,
8995
kafkaDomainParams,
96+
topicWaitRetryParams,
9097
config[Cli.upstreamHost],
9198
config[Cli.upstreamPort],
9299
GrpcServiceConfigProvider.config,

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ class PipelineSubscriber(
3434
kafkaAdminProperties: KafkaAdminProperties,
3535
kafkaStreamsParams: KafkaStreamsParams,
3636
private val kafkaDomainParams: KafkaDomainParams,
37+
private val topicWaitRetryParams: TopicWaitRetryParams,
3738
private val upstreamHost: String,
3839
private val upstreamPort: Int,
3940
grpcServiceConfig: Map<String, Any>,
4041
private val kafkaConsumerGroupIdPrefix: String,
4142
private val namespace: String,
4243
) {
43-
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams)
44+
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams, topicWaitRetryParams)
4445
private val channel = ManagedChannelBuilder
4546
.forAddress(upstreamHost, upstreamPort)
4647
.defaultServiceConfig(grpcServiceConfig)
@@ -105,7 +106,6 @@ class PipelineSubscriber(
105106
}
106107
}
107108
.collect()
108-
// TODO - error handling?
109109
// TODO - use supervisor job(s) for spawning coroutines?
110110
}
111111

@@ -121,8 +121,20 @@ class PipelineSubscriber(
121121
kafkaConsumerGroupIdPrefix: String,
122122
namespace: String,
123123
) {
124-
logger.info("Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}", metadata.name, metadata.version, metadata.id)
125-
val (pipeline, err) = Pipeline.forSteps(metadata, steps, kafkaProperties, kafkaDomainParams, kafkaConsumerGroupIdPrefix, namespace)
124+
logger.info(
125+
"Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
126+
metadata.name,
127+
metadata.version,
128+
metadata.id
129+
)
130+
val (pipeline, err) = Pipeline.forSteps(
131+
metadata,
132+
steps,
133+
kafkaProperties,
134+
kafkaDomainParams,
135+
kafkaConsumerGroupIdPrefix,
136+
namespace
137+
)
126138
if (err != null) {
127139
err.log(logger, Level.ERROR)
128140
client.pipelineUpdateEvent(
@@ -133,7 +145,6 @@ class PipelineSubscriber(
133145
reason = err.getDescription() ?: "failed to initialize dataflow engine"
134146
)
135147
)
136-
137148
return
138149
}
139150

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ data class KafkaDomainParams(
4949
val joinWindowMillis: Long,
5050
)
5151

52+
data class TopicWaitRetryParams(
53+
val createTimeoutMillis: Int, // int required by the underlying kafka-streams library
54+
val describeTimeoutMillis: Long,
55+
val describeRetries: Int,
56+
val describeRetryDelayMillis: Long
57+
)
58+
5259
val kafkaTopicConfig = { maxMessageSizeBytes: Int ->
5360
mapOf(
5461
TopicConfig.MAX_MESSAGE_BYTES_CONFIG to maxMessageSizeBytes.toString(),

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/KafkaAdmin.kt

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,45 @@ the Change License after the Change Date as each is defined in accordance with t
99

1010
package io.seldon.dataflow.kafka
1111

12+
import com.github.michaelbull.retry.ContinueRetrying
13+
import com.github.michaelbull.retry.policy.RetryPolicy
14+
import com.github.michaelbull.retry.policy.constantDelay
15+
import com.github.michaelbull.retry.policy.limitAttempts
16+
import com.github.michaelbull.retry.policy.plus
17+
import com.github.michaelbull.retry.retry
1218
import io.seldon.mlops.chainer.ChainerOuterClass.PipelineStepUpdate
1319
import org.apache.kafka.clients.admin.Admin
20+
import org.apache.kafka.clients.admin.CreateTopicsOptions
1421
import org.apache.kafka.clients.admin.NewTopic
1522
import org.apache.kafka.common.KafkaFuture
23+
import org.apache.kafka.common.errors.TimeoutException
1624
import org.apache.kafka.common.errors.TopicExistsException
25+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
1726
import java.util.concurrent.ExecutionException
27+
import java.util.concurrent.TimeUnit
1828
import io.klogging.logger as coLogger
1929

2030
class KafkaAdmin(
2131
adminConfig: KafkaAdminProperties,
2232
private val streamsConfig: KafkaStreamsParams,
33+
private val topicWaitRetryParams: TopicWaitRetryParams,
2334
) {
2435
private val adminClient = Admin.create(adminConfig)
2536

2637
suspend fun ensureTopicsExist(
2738
steps: List<PipelineStepUpdate>,
2839
) : Exception? {
40+
val missingTopicRetryPolicy: RetryPolicy<Throwable> = {
41+
when (reason.cause) {
42+
is TimeoutException,
43+
is UnknownTopicOrPartitionException -> ContinueRetrying
44+
else -> {
45+
logger.warn("ignoring exception while waiting for topic creation: ${reason.message}")
46+
ContinueRetrying
47+
}
48+
}
49+
}
50+
2951
try {
3052
steps
3153
.flatMap { step -> step.sourcesList + step.sink + step.triggersList }
@@ -46,12 +68,27 @@ class KafkaAdmin(
4668
)
4769
}
4870
.run {
49-
adminClient.createTopics(this)
71+
adminClient.createTopics(
72+
this,
73+
CreateTopicsOptions().timeoutMs(topicWaitRetryParams.createTimeoutMillis)
74+
)
5075
}
5176
.values()
5277
.also { topicCreations ->
53-
topicCreations.entries.forEach { creationResult ->
54-
awaitKafkaResult(creationResult)
78+
logger.info("Waiting for kafka topic creation")
79+
// We repeatedly attempt to describe all topics as a way of blocking until they exist at least on
80+
// one broker. This is because the call to createTopics above returns before topics can actually
81+
// be subscribed to.
82+
retry(
83+
missingTopicRetryPolicy + limitAttempts(topicWaitRetryParams.describeRetries) + constantDelay(
84+
topicWaitRetryParams.describeRetryDelayMillis
85+
)
86+
) {
87+
logger.debug("Still waiting for all topics to be created...")
88+
// the KafkaFuture retrieved via .allTopicNames() only succeeds if all the topic
89+
// descriptions succeed, so there is no need to check topic descriptions individually
90+
adminClient.describeTopics(topicCreations.keys).allTopicNames()
91+
.get(topicWaitRetryParams.describeTimeoutMillis, TimeUnit.MILLISECONDS)
5592
}
5693
}
5794
} catch (e: Exception) {
@@ -62,22 +99,10 @@ class KafkaAdmin(
6299
return e
63100
}
64101

102+
logger.info("All topics created")
65103
return null
66104
}
67105

68-
private suspend fun awaitKafkaResult(result: Map.Entry<String, KafkaFuture<Void>>) {
69-
try {
70-
result.value.get()
71-
logger.info("Topic created ${result.key}")
72-
} catch (e: ExecutionException) {
73-
if (e.cause is TopicExistsException) {
74-
logger.info("Topic already exists ${result.key}")
75-
} else {
76-
throw e
77-
}
78-
}
79-
}
80-
81106
companion object {
82107
private val logger = coLogger(KafkaAdmin::class)
83108
}

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Pipeline.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ class Pipeline(
140140
): Pair<Pipeline?, PipelineStatus.Error?> {
141141
val (topology, numSteps) = buildTopology(metadata, steps, kafkaDomainParams)
142142
val pipelineProperties = localiseKafkaProperties(kafkaProperties, metadata, numSteps, kafkaConsumerGroupIdPrefix, namespace)
143-
var streamsApp : KafkaStreams? = null
144-
var pipelineError: PipelineStatus.Error? = null
143+
var streamsApp : KafkaStreams?
144+
var pipelineError: PipelineStatus.Error?
145145
try {
146146
streamsApp = KafkaStreams(topology, pipelineProperties)
147147
} catch (e: StreamsException) {

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/PipelineStatus.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
3737
}
3838

3939
// log status when logger is in a coroutine
40-
override fun log(logger: Klogger, level: Level) {
40+
override fun log(logger: Klogger, levelIfNoException: Level) {
4141
var exceptionMsg = this.exception?.message
4242
var exceptionCause = this.exception?.cause ?: Exception("")
4343
var statusMsg = this.message
@@ -47,17 +47,17 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
4747
}
4848
if (exceptionMsg != null) {
4949
runBlocking {
50-
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
50+
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
5151
}
5252
} else {
5353
runBlocking {
54-
logger.log(level, "$statusMsg")
54+
logger.log(levelIfNoException, "$statusMsg")
5555
}
5656
}
5757
}
5858

5959
// log status when logger is outside coroutines
60-
override fun log(logger: NoCoLogger, level: Level) {
60+
override fun log(logger: NoCoLogger, levelIfNoException: Level) {
6161
val exceptionMsg = this.exception?.message
6262
val exceptionCause = this.exception?.cause ?: Exception("")
6363
var statusMsg = this.message
@@ -66,9 +66,9 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
6666
statusMsg += ", stop cause: $prevStateDescription"
6767
}
6868
if (exceptionMsg != null) {
69-
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
69+
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
7070
} else {
71-
logger.log(level, "$statusMsg")
71+
logger.log(levelIfNoException, "$statusMsg")
7272
}
7373
}
7474
}

scheduler/data-flow/src/main/resources/local.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ kafka.sasl.username=seldon
2121
kafka.sasl.secret=
2222
kafka.sasl.password.path=
2323
kafka.sasl.mechanism=PLAIN
24+
topic.create.timeout.millis=60000
25+
topic.describe.timeout.millis=1000
26+
topic.describe.retry.attempts=60
27+
topic.describe.retry.delay.millis=1000

0 commit comments

Comments
 (0)