== Websocket binding used for Kafka-REST Assume the version of the Websocket binding and the use of optional features has been negotiated during the handshake phase after the socket is open. This Websocket binding is originated from SwaggerSocket[1]. It can be considered as a simplified yet more efficient and flexible version of the original SwaggerSocket. The protocol handler used for this binding is included in Atmosphere 2.4.4 and newer [2] and it can be used in any jaxrs applications. It is characterized by the following rules. - Each request contains the method, path, and optional type and data. - Each request may be sent with a unique-ID for the client to correlate its response to the request. - Arbitrary Http headers in the request are not supported. - The content-type header can be optionally supplied if the content is present. - The accept header can be optionally supplied if the response is expected. - The content entity can be optionally supplied either within the JSON envelope or after the envelope. - A large message can be transported as a series of messages that can be reassembled at the receiving end. - The server can periodically send a heartbeat message to the connected clients to keep the connections alive. REST over Websocket uses the following message format. CAUTION: The format described in this document correspond to the behavior implemented for kafka-rest-atmosphere version 0.1.0-SNAPSHOT [3]. [caption="Format: "] .General Syntax ==== {"id": "*_identifier_*", "method": "*_method_*", "path": "*_path_*", "type": "*_type_value_*", "accept": "*_accept_value_*", "headers": *_headers_map_*, "continue": *_continue_} *_content_* ==== where - *_identifier_* represents an identifier for the request message which is used in any response message to refer to the original request message, - *_method_* represents the request method, - *_path_* represents the request path, - *_type_* and *_accept_* represent the optional content-type and accept header values. - *_headers_map_* represents the optional map to include additional headers. - *_content_* represents the optional detached content entity. - *_continue_* represents the optional boolean value which indicates the message continues, in other words, followed by another message. ===== Examples .A GET request to path /foo ==== {"id": "123", "method": "GET", "path": "/foo"} ==== .A POST request to path /foo with text content "Hello World!" ==== {"id": "123", "method": "POST", "path": "/foo", "type": "text/plain"}Hello World! ==== .A response with content "Hello World!". ==== {"id": "123"}Hello World! ==== .A response with large content "From a ...... to z" split in two responses. ==== {"id": "123", "continue": true}From a ... ==== ==== {"id": "123"}... to z ==== === Messages used in the websocket binding In the following sections, some of the supported operations are shown in the websocket binding. All the operations supported by kafka-rest can be invoked over Websocket using this websocket binding. In addition to those operations supported by kafka-rest, there are subscribe operations that are specifically supported for the websocket binding. For details about all the operations supported by kafka-rest, please refer to kafka-rest's documentation [4]. ==== Get the list of topics [caption="Format: "] .list ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics"} ==== .Sample Request (listing topics) ---- {"id": "123", "method": "GET", "path": "/topics"} ---- .Sample Response ---- {"id": "123"}["test", "jsontest"] ---- ==== Get the information about a specific topic [caption="Format:"] .getTopic ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics/*_topic_*"} ==== where *_topic_* is a topic name. .Sample Request (getting information about topic "test") ---- {"id": "124", "method": "GET", "path": "/topics/test"} ---- Sample Response ---- {"id": "124"}{"name":"test","configs":{},"partitions":[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]} ---- ==== Produce and consume messages in Binary Mode ===== Producing messages (Binary Mode) [caption="Format: "] .produceBinary ==== {"id": "*_identifier_*", "method": "POST", "path": "/topics/*_topic_*", "type": "application/vnd.kafka.binary.v1+json"}{"records": [{"value": "*_base64_message_*"}]} ==== where *_topic_* is a topic name and *_base64_message_* is a base64 encoded string representing the content entity. .Sample Request (publishing binary message "Kafka" to topic "test") ---- {"id": "125", "method": "POST", "path": "/topics/test", "type": "application/vnd.kafka.binary.v1+json"}{"records": [{"value": "S2Fma2E="}]} ---- .Sample Response ---- {"id": "125"}{"offsets":[{"partition":0,"offset":1,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null} ---- ===== Consuming Messages (Binary-Mode) - 1. Create a consumer (Binary-Mode) [caption="Format: "] .createGroup ==== {"id": "*_identifier_*", "method": "POST", "path": "/consumers/*_consumer_group_*", "type": "application/vnd.kafka.binary.v1+json"}{"id": "*_instance_name_*", "format": "binary", "auto.offset.reset": "smallest"} ==== where *_consumer_group_* is a consumer group name and *_instance_name_* is a consumer instance name. .Sample Request (creating binary consumer "my_instance" at consumer group "my_consumer_group") ---- {"id": "126", "method": "POST", "path": "/consumers/my_consumer_group", "type": "application/vnd.kafka.binary.v1+json"}{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"} ---- .Sample Response ---- {"id": "126"}{"instance_id":"my_instance","base_uri":"http://localhost:8082/consumers/my_consumer_group/instances/my_instance"} ---- - 2. Subscribe to the topic (Binary-Mode) [caption="Format: "] .subscribeTopicBinary ==== {"id": "*_identifier_*", "method": "GET", "path": "/ws/consumers/*_consumer_group_*/instances/*_instance_name_*/topics/*_topic_*", "accept": "application/vnd.kafka.binary.v1+json"} ==== where *_consumer_group_* is a consumer group name, *_instance_name_* is a consumer instance name, and *_topic_* is a topic name. .Sample Request (subscribing to binary topic "test" by consumer "my_instance" at consumer group "my_consumer_group") ---- {"id": "127", "method": "GET", "path": "/ws/consumers/my_consumer_group/instances/my_instance/topics/test", "accept": "application/vnd.kafka.binary.v1+json"} ---- .Sample Response ---- {"id": "127"}[{"key":null,"value":"SG9sYSBLYWZrYQ==","partition":0,"offset":0},{"key":null,"value":"S2Fma2E=","partition":0,"offset":1},{"key":null,"value":"S2Fma2E=","partition":0,"offset":2},{"key":null,"value":"S2Fma2E=","partition":0,"offset":3}] {"id": "127"}{"key":null,"value":"S2Fma2E=","partition":0,"offset":4} ---- - 3. Unsubscribe from the topic [caption="Format: "] .unsubscribeTopic ==== {"id": "*_identifier_*", "method": "DELETE", "path": "/ws/consumers/*_consumer_group_*/instances/*_instance_name_*/topics/*_topic_*"} ==== where *_consumer_group_* is a consumer group name, *_instance_name_* is a consumer instance name, and *_topic_* is a topic name. .Sample Request (unsubscribing from topic "test" by consumer "my_instance" at consumer group "my_consumer_group") ---- {"id": "128", "method": "DELETE", "path": "/ws/consumers/my_consumer_group/instances/my_instance/topics/test"} ---- - 4. Destroy the consumer [caption="Format: "] .deleteGroup ==== {"id": "*_identifier_*", "method": "DELETE", "path": "/consumers/*_consumer_group_*/instances/*_instance_name_*"} ==== where *_consumer_group_* is a consumer group name and *_instance_name_* is a consumer instance name .Sample Request (deleting consumer "my_instance" at consumer group "my_consumer_group") ---- {"id": "129", "method": "DELETE", "path": "/consumers/my_consumer_group/instances/my_instance"} ---- ==== Produce and consume messages in JSON Mode ===== Producing Messages (JSON-Mode) [caption="Format: "] .produceJson ==== {"id": "*_identifier_*", "method": "POST", "path": "/topics/*_topic_*", "type": "application/vnd.kafka.json.v1+json"}{"records": [{"value": *_json_message_*}]} ==== where *_topic_* is a topic name and *_json_message_* is a json encoded string representing the content entity. .Sample Request (publishing json message {"greeting": "hey"} to topic "jsontest") ---- {"id": "125", "method": "POST", "path": "/topics/jsontest", "type": "application/vnd.kafka.json.v1+json"}{"records": [{"value": {"greeting": "hey"}}]} ---- .Sample Response ---- {"id": "125"}{"offsets":[{"partition":0,"offset":11,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null} ---- .Sample Request (publishing json messages "foo" and {"greeting": "bye"} to topic "jsontest") ---- {"id": "126", "method": "POST", "path": "/topics/jsontest", "type": "application/vnd.kafka.json.v1+json"}{"records": [{"value": "foo"}, {"value": {"greeting": "bye"}}]} ---- .Sample Response ---- {"id": "126"}{"offsets":[{"partition":0,"offset":12,"error_code":null,"error":null},{"partition":0,"offset":13,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null} ---- ===== Consuming Messages (JSON-Mode) - 1. Create a consumer (JSON-Mode) [caption="Format: "] .createGroup ==== {"id": "*_identifier_*", "method": "POST", "path": "/consumers/*_consumer_group_*", "type": "application/vnd.kafka.json.v1+json"}{"id": "*_instance_name_*", "format": "json", "auto.offset.reset": "smallest"} ==== where *_consumer_group_* is a consumer group name and *_instance_name_* is a consumer instance name. .Sample Request (creating json consumer "my_json_instance" at consumer group "my_json_consumer_group") ---- {"id": "126", "method": "POST", "path": "/consumers/my_json_consumer_group", "type": "application/vnd.kafka.json.v1+json"}{"id": "my_json_instance", "format": "json", "auto.offset.reset": "smallest"} ---- .Sample Response ---- {"id": "126"}{"instance_id":"my_json_instance","base_uri":"http://localhost:8082/consumers/my_json_consumer_group/instances/my_json_instance"} ---- - 2. Subscribe to the topic (JSON-Mode) [caption="Format: "] .subscribeTopicJson ==== {"id": "*_identifier_*", "method": "GET", "path": "/ws/consumers/*_consumer_group_*/instances/*_instance_name_*/topics/*_topic_*", "accept": "application/vnd.kafka.json.v1+json"} ==== where *_consumer_group_* is a consumer group name, *_instance_name_* is a consumer instance name, and *_topic_* is a topic name. .Sample Request (subscribing json topic "jsontest" by consumer "my_json_instance" at consumer group "my_json_consumer_group") ---- {"id": "127", "method": "GET", "path": "/ws/consumers/my_json_consumer_group/instances/my_json_instance/topics/jsontest", "accept": "application/vnd.kafka.json.v1+json"} ---- .sample Response ---- {"id": "127"}[{"key":null,"value":{"greeting":"hey"},"partition":0,"offset":1},{"key":null,"value":{"greeting":"hey"},"partition":0,"offset":2},{"key":null,"value":{"greeting":"bye"},"partition":0,"offset":3}] {"id": "127"}{"key":null,"value":{"greeting":"bye"},"partition":0,"offset":4} ---- - 3. Unsubscribe from the topic [caption="Format: "] .unsubscribeTopic ==== {"id": "*_identifier_*", "method": "DELETE", "path": "/ws/consumers/*_consumer_group_*/instances/*_instance_name_*/topics/*_topic_*"} ==== where *_consumer_group_* is a consumer group name, *_instance_name_* is a consumer instance name, and *_topic_* is a topic name. .Sample Request (unsubscribing from topic "jsontest" by consumer "my_json_instance" at consumer group "my_json_consumer_group") ---- {"id": "128", "method": "DELETE", "path": "/ws/consumers/my_json_consumer_group/instances/my_json_instance/topics/jsontest"} ---- - 4. Destroy the consumer [caption="Format: "] .deleteGroup ==== {"id": "*_identifier_*", "method": "DELETE", "path": "/consumers/*_consumer_group_*/instances/*_instance_name_*"} ==== where *_consumer_group_* is a consumer group name and *_instance_name_* is a consumer instance name. .Sample Request (deleting consumer "my_json_instance" at consumer group "my_json_consumer_group") ---- {"id": "129", "method": "DELETE", "path": "/consumers/my_json_consumer_group/instances/my_json_instance"} ---- ==== Get information about partitions [caption="Format: "] .list ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics/*_topic_*/partitions"} ==== .Sample Request (listing partitions for topic "test") ---- {"id": "127", "method": "GET", "path": "/topics/test/partitions"} ---- .sample Response ---- {"id": "127"}[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}] ---- [caption="Format: "] .getPartition ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics/*_topic_*/partitions/*_partition_*"} ==== .Sample Request (getting information about partion 0 or topic "test) ---- {"id": "128", "method": "GET", "path": "/topics/test/partitions/0"} ---- .sample Response ---- {"id": "128"}{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]} ---- ==== Consume messages from a specific partition (Binary Mode) [caption="Format: "] .consumeBinary ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics/*_topic_*/partitions/*_partition_*/messages?offset=*_offset_*[&count=*_count_*]", "accept": "application/vnd.kafka.binary.v1+json"} ==== .Sample Request (consume message with offset 45 from partition 0 of topic "test") ---- {"id": "130", "method": "GET", "path": "/topics/test/partitions/0/messages?offset=45", "accept" : "application/vnd.kafka.binary.v1+json"} ---- .Sample Response ---- {"id": "130"}[{"key":null,"value":"S2Fma2E=","partition":0,"offset":45}] ---- ==== Consume messages from a specific partition (JSON Mode) [caption="Format: "] .consumeJSON ==== {"id": "*_identifier_*", "method": "GET", "path": "/topics/*_topic_*/partitions/*_partition_*/messages?offset=*_offset_*[&count=*_count_*]", "accept": "application/vnd.kafka.json.v1+json"} ==== .Sample Request (consume message with offset 45 from partition 0 of topic "jsontest") ---- {"id": "130", "method": "GET", "path": "/topics/jsontest/partitions/0/messages?offset=45", "accept" : "application/vnd.kafka.json.v1+json"} ---- .Sample Response ---- {"id": "130"}[{"key":null,"value":{"greeting":"hey"},"partition":0,"offset":45}] ---- ==== Committing offsets [caption="Format: "] .commitOffsets ==== {"id": "*_identifier_*", "method": "POST", "path": "/consumers/*_consumer_group_*/instances/*_instance_name_*/offsets"} ==== .Sample Request (committing the offsets for consumer "my_instance" at consumer group "my_consumer_group") ---- {"id": "129", "method": "POST", "path": "/consumers/my_consumer_group/instances/my_instance/offsets", "accept" : "application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json"} ---- .Sample Response ---- {"id": "129"}[{"topic":"test","partition":0,"consumed":45,"committed":45}]} ---- ==== References - [1] https://github.com/swagger-api/swagger-socket[] - [2] https://github.com/Atmosphere/atmosphere[] - [3] https://github.com/elakito/kafka-rest-atmosphere[] - [4] http://docs.confluent.io/2.0.0/kafka-rest/docs/index.html[]