= Getting Started Decaton :base_version: 9.0.0 :modules: common,client,processor,protobuf Let's start from the most basic usage of Decaton client/processor. Through this guide we use Gradle as a build system and link:https://developers.google.com/protocol-buffers[Protocol Buffers] for defining tasks, but you can choose arbitrary alternatives as you want. == Defining Task Schema In this guide we'll use Protocol Buffers since it's our usual choice. While Decaton provides support for Protocol Buffers, you can choose arbitrary serialization format as you want. All you need is to implement link:../common/src/main/java/com/linecorp/decaton/common/Serializer.java[Serializer] and link:../common/src/main/java/com/linecorp/decaton/common/Deserializer.java[Deserializer] and supply it to where Decaton API requires. To make use of Protocol Buffers, you need to configure your build to compile Java classes from protobuf IDLs during build. We also apply link:https://github.com/johnrengelman/shadow[gradle plugin for making shadow jar] for ease of executing, but it's completely optional in real projects. [source,groovy] .build.gradle ---- plugins { id 'idea' id 'com.google.protobuf' version '0.8.18' id 'com.github.johnrengelman.shadow' version '7.1.1' } dependencies { implementation "com.google.protobuf:protobuf-java:$PROTOBUF_VERSION" implementation "org.apache.kafka:kafka-clients:$KAFKA_VERSION" } protobuf { protoc { artifact = "com.google.protobuf:protoc:$PROTOBUF_VERSION" } } idea { module { generatedSourceDirs += file('build/generated/source/proto/main/java') } } ---- Now your build is ready to compile `.proto` files under `src/main/proto`. Next is to add `.proto` file containing task definigion - here just one very simple task - `PrintMessageTask`. [source,protobuf] .mytasks.proto ---- syntax = "proto3"; package com.linecorp.decaton.example.protocol; message PrintMessageTask { string name = 1; int32 age = 2; } ---- == Implement the task producer Add following required dependencies. `decaton-protobuf` is optional since it's just a support module providing few convenient classes for serde-ing Protocol Buffers. [source,groovy] .build.gradle ---- implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION" implementation "com.linecorp.decaton:decaton-client:$DECATON_VERSION" implementation "com.linecorp.decaton:decaton-protobuf:$DECATON_VERSION" ---- You can use link:../client/src/main/java/com/linecorp/decaton/client/DecatonClient.java[DecatonClient] to produce tasks into Kafka topic. It's just a wrapper around kafka's `Producer` interface, providing convenient methods to construct Kafka records in protocol expected by Decaton. `DecatonClient` should be created just once for whole application lifecycle, and it requires to call its `#close` for making sure that all tasks are flushed out of buffer. [source,java] .ProducerMain.java ---- public final class ProducerMain { public static void main(String[] args) throws Exception { try (DecatonClient client = newClient()) { String name = args[0]; int age = Integer.parseInt(args[1]); PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(age).build(); CompletableFuture result = client.put(name, task); // <1> // Synchronously wait the result result.join(); // Asynchronously observe the result result.whenComplete((r, e) -> { System.err.println("Producing task failed... " + e); }); } } private static DecatonClient newClient() { Properties producerConfig = new Properties(); producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-decaton-client"); producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers")); return DecatonClient.producing("my-decaton-topic", new ProtocolBuffersSerializer()) .applicationId("MyApp") // <2> // By default it sets local hostname but here we go explicit .instanceId("localhost") .producerConfig(producerConfig) .build(); } } ---- <1> Use name as key as well. Key decides which partition does the task (record) to be routed to. <2> `applicationId` is used to distinguish which application does the task comes from. Just select arbitrary unique identifier. == Implement the processor Add following required dependencies additionally. [source,groovy] .build.gradle ---- implementation "com.linecorp.decaton:decaton-processor:$DECATON_VERSION" ---- Implementing Decaton processor involves two steps. First, you need to make implementation for link:../processor/src/main/java/com/linecorp/decaton/processor/DecatonProcessor.java[DecatonProcessor] where its generics type `T` is the class of your task. [source,java] .PrintMessageTaskProcessor.java ---- public class PrintMessageTaskProcessor implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { System.out.printf("Noticed %s is %d years old\n", task.getName(), task.getAge()); } } ---- Second, you need to instantiate link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java[ProcessorSubscription] which represents the running instance of Decaton processor. In below example we're running it just 10 seconds and closing subscription to demonstrate. In realistic subscription gets terminated along with application's termination. [source,java] .ProcessorMain.java ---- public final class ProcessorMain { public static void main(String[] args) throws Exception { Properties consumerConfig = new Properties(); consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor"); consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers")); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor"); ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") // <1> .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .buildAndStart(); Thread.sleep(10000); subscription.close(); } } ---- <1> `subscriptionId` is Decaton's internal-use only ID for metric labeling, logging and so on as of current version. == Test it === Preparing topic Before we run the processor and producer, we have to prepare Kafka topic `my-decaton-topic`. Here we created it with 3 partitions and 3 replicas which is enough to demonstrate our example. === Running processor and producer Now everything are ready to test it. Let's try running processor, put one task and see what happens. [source,sh] ---- $ ./gradlew shadowJar $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProcessorMain & $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProducerMain motoko 25 Put task succeeded: my-decaton-topic-1-5 Noticed motoko is 25 years old ---- Worked :) Decaton processor running got a task from producer through Kafka topic and processed it - print message. Now we got a working example of Decaton application from producer to processor. Although we continue a bit more to demonstrate Decaton's one of the most important capability - concurrent processing of tasks. == Higher processing concurrency for higher throughput Decaton supports concurrent processing of tasks in one partition. The reason we need this is described in link:../README.md[README] so here I just go through to show how is it effective. === Simulating high-throughput, high-latency IO processing To simulate processing that involves IO with external system, we prepare another processor implementation `PrintMessageTaskProcessor2`. It simulates blocking behavior of IO by sleeping tens of few milliseconds in processing task. [source,java] .PrintMessageTaskProcessor2.java ---- public class PrintMessageTaskProcessor2 implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { long deliveryLatencyMs = System.currentTimeMillis() - context.metadata().timestampMillis(); simulateSlowIO(); System.out.printf("Task for %s delivered in %d ms\n", task.getName(), deliveryLatencyMs); } private static void simulateSlowIO() throws InterruptedException { Thread.sleep(30); } } ---- We also change `ProcessorMain` a bit, to configure Decaton how many threads to use for processing one partition. [source,java] .ProcessorMain2.java ---- int partitionConcurrency = Integer.parseInt(System.getProperty("concurrency")); ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor2()) ) .consumerConfig(consumerConfig) .properties( StaticPropertySupplier.of( Property.ofStatic( ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, // <1> partitionConcurrency), Property.ofStatic( ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, // <2> 100))) .buildAndStart(); ---- <1> Use `CONFIG_PARTITION_CONCURRENCY` to set number of threads to use for processing *one partition*. If it's set to 10 and an instance got 3 partitions assigned, there will 30 threads to be created. <2> It's a good idea to set `CONFIG_MAX_PENDING_RECORDS` that configures how many offset do you wanna allow it to forward without waiting current lowest offset to have completed. More pending records potentially consumes more memory and makes amount of re-procssed records huge when fail-over occurs, but it reduces possibility of your processor got stuck by tasks taking outlying processing latency. In above example we set just few properties. You can visit link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java[ProcessorProperties] to view the list of customizable properties. We also make `ProducerMain` to produce many generated tasks to follow realistic workload. [source,java] .BatchProducerMain.java ---- public final class BatchProducerMain { public static void main(String[] args) throws Exception { try (DecatonClient client = newClient()) { for (int i = 0; i < 100; i++) { String name = "name:" + i; PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(i).build(); client.put(name, task) .whenComplete((r, e) -> { if (e != null) { System.err.println("Producing task failed... " + e); } }); } } } ... ---- First we run it with setting partition concurrency to `1` which is the default. [source,sh] ---- $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=1 example.ProcessorMain2 $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain Task for name:1 delivered in 37 ms Task for name:0 delivered in 199 ms Task for name:3 delivered in 41 ms ... Task for name:95 delivered in 1287 ms Task for name:96 delivered in 1322 ms ---- As you can see, milliseconds of processing latency accumulates and finally causing those tasks which are processed later to get over 1000 ms of latency before it gets delivered to processing logic. This is natural because Kafka's topic-partition is a queue and by default records in partition are processed sequentially. Hence preceding tasks's processing latency applies to following tasks's delivery latency. This also impacts processing throughput negatively because of it gets capped by high latency in processing time, making machine resource idle while awaiting IO response from external systems. The point here is actually those tasks are having different keys. If what we care is just about to preserve processing order and sequentiality based on their keys, we should be able to process them in parallel. === Increasing processing concurrency So next we try to increase concurrency to 20, means it uses 20 threads to process tasks coming from one partition. [source,sh] ---- $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=20 example.ProcessorMain2 $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain Task for name:10 delivered in 41 ms Task for name:36 delivered in 37 ms ... Task for name:84 delivered in 160 ms Task for name:89 delivered in 183 ms ---- As you can see, variance of delivery latencies between first two and last two are pretty much shorter than when we were using just 1 thread for processing them. Of course this contributes to higher throughput as well. [NOTE] ==== Even though Decaton processes tasks from one partition concurrently, it preserves ordering guarantee *based on their keys*. As long as two tasks sharing the same key, they're guaranteed to be processed in-order and sequentially. ==== CAUTION: Do not find any interpretation in absolute latencies those shown in above output. The most part of it is from network latency between my laptop and Kafka servers I used. In reality it could be much shorter or longer depending on various parameters. == Asynchronous processing completion Decaton processor API supports asynchronous completion of tasks too. Task "completion" is Decaton-specific concept to express that "task is ready for committing its record offset". By default Decaton considers the task is "completed" when `DecatonProcessor#process` method returns for the given topic. However you can defer this timing by manually declaring that you wanna defer the completion of this task. This is useful particularly when you're using some middleware's async-client which has its own queue and callback support for telling the result of middleware request. One good example is Kafka's `Producer` client. It supports async callback for supplying the result of production. Below is the example of processor which leverages Decaton's "defer completion" system for completing task asynchronously. [source,java] ---- public class PrintMessageTaskProcessorAsync implements DecatonProcessor { Producer producer; @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { Completion completion = context.deferCompletion(); // <1> producer.send(new ProducerRecord<>("next-topic", "Hello" + task.getName()), (metadata, exception) -> completion.complete()); } } ---- <1> By calling `ProcessingContext#deferCompletion`, your code will take full responsibility of calling its `#complete` exactly. If your code misses it by any reason (called completion leak), soon your processor stucks and stops consuming further records. By leveraging Decaton's deferred completion and async-client of your middleware which multiplexes IO with servers, it would works efficiently to lead higher throughput and you will likely need to give less `CONFIG_PARTITION_CONCURRENCY`. == Where to go from here Now you know the basics and ready to start implementing Decaton apps! For those thinking to run Decaton on production, link:./monitoring.adoc[Monitoring] might helps to always ensure your Decaton processors doing good. If you're using link:https://spring.io/[Spring] for running your applications, you might wanna take a look at link:./spring-integration.adoc[Spring Integration]. Besides its main functionality, Decaton offers a lot of features made out of actual requirement for building services. Go back to link:./index.adoc[Index] and find the list.