Skip to content

cdekok/kueue

Repository files navigation

Kueue

tl:dr;

Kueue is a persistent job queue in Kotlin to offload tasks to be processed in the background, with at-least-once delivery.

Table of contents

Installation

Gradle

build.gradle.kts

// Add jitpack
repositories {
    maven { url = uri("https://jitpack.io") }
}

// Include packages
dependencies {
    implementation("com.github.cdekok:kueue-core:0.1.0")
    implementation("com.github.cdekok:kueue-pg-vertx:0.1.0")
    implementation("com.github.cdekok:kueue-retry:0.1.0")
    implementation("com.github.cdekok:kueue-serializer-kotlinx:0.1.0")
}

Quick start

To view the full source of the example look in the example module

Producer

Create a producer and send messages to process

// postgres client
val db = PgPool.pool(
    PgConnectOptions.fromEnv(),
    PoolOptions().setMaxSize(10),
)

// kotlinx json serializer
val serializer = KotlinxMessageSerializer(
    Json {
        classDiscriminator = "type"
        serializersModule = SerializersModule {
            polymorphic(Message::class) {
                subclass(RecordCreated::class)
            }
        }
    }
)

// postgres producer
val producer: Producer = PgProducer(
    client = db,
    serializer = serializer,
)

// message to send
@Serializable
data class RecordCreated(
    val id: Int,
    val title: String,
) : Message

val message = RecordCreated(
    id = 123,
    title = "test title $it",
)

// send the message
producer.send(
    topic = DEFAULT_TOPIC,
    message = message,
)

Consumer

Setup a listener to process the events

/**
 * Process messages one by one
 */
class TestListener : EventListener {
    @EventHandler
    fun on(event: RecordCreated) {
        logger.info { "handle record created $event" }
    }

    @EventHandler
    fun on(event: RecordCreated) {
        logger.info { "handle record created $event" }
    }
}

/**
 * Batch process messages
 */
class BatchListener : EventListener {
    @EventHandler
    fun on(event: List<RecordCreated>) {
        logger.info { "handle record created ${event.size}" }
    }
}

val listeners = listOf(
    TestListener(),
    BatchListener(),
)

val consumer = PgConsumer(
    client = pgPool(),
    serializer = kotlinXSerializer(),
)

runBlocking {
    consumer.subscribe(
        topic = "records",
        amount = 10,
        listeners = listeners,
    )
}

Run the example module

Set the correct java version with sdk

sdk env

Start postgres

docker-compose -f ./kueue-example-pg/docker-compose.yml up

Send some messages to be processed

./kueue-example-pg/run producer --amount=10000

Consume messages in a different process

./kueue-example-pg/run consumer

Serializers

It's possible to run the example with a different serializer with the serializer option.

./kueue-example-pg/run producer --amount=100 --serializer=XSTREAM
./kueue-example-pg/run consumer --serializer=XSTREAM

Development

Set the correct java version with sdk

sdk env

Run code formatter with detekt

./gradlew format

Check code formatting

./gradlew detekt`

Run all tests

./gradlew test

Run all tests & formatting rules

./gradlew check

Check for dependency updates

./gradlew dependencyUpdates

Update dependencies

./gradlew versionCatalogUpdate

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published