From 84c18848e7d6d36ab2e3a25c1756681d87e0f9db Mon Sep 17 00:00:00 2001 From: "Wang, Shaofei" Date: Fri, 1 Feb 2019 18:09:25 -0500 Subject: [PATCH] Kafka protocol producer with librdkafka library format: kafka:/// Change-Id: Ia342a2419c009d6f9c1a6b13f7c5a7037eddc828 --- configure | 5 ++ libavformat/Makefile | 1 + libavformat/kafkaproto.c | 186 +++++++++++++++++++++++++++++++++++++++++++++++ libavformat/protocols.c | 1 + 4 files changed, 193 insertions(+) create mode 100644 libavformat/kafkaproto.c diff --git a/configure b/configure index fb87f47..05a481a 100755 --- a/configure +++ b/configure @@ -263,6 +263,7 @@ External library support: --enable-libsoxr enable Include libsoxr resampling [no] --enable-libspeex enable Speex de/encoding via libspeex [no] --enable-libsrt enable Haivision SRT protocol via libsrt [no] + --enable-librdkafka enable Kafka protocol via librdkafka [no] --enable-libssh enable SFTP protocol via libssh [no] --enable-libtensorflow enable TensorFlow as a DNN module backend for DNN based filters like sr [no] @@ -1764,6 +1765,7 @@ EXTERNAL_LIBRARY_LIST=" openal opengl vapoursynth + librdkafka " HWACCEL_AUTODETECT_LIBRARY_LIST=" @@ -3349,6 +3351,7 @@ libsrt_protocol_deps="libsrt" libsrt_protocol_select="network" libssh_protocol_deps="libssh" libtls_conflict="openssl gnutls mbedtls" +rdkafka_protocol_deps="librdkafka" # filters afftdn_filter_deps="avcodec" @@ -6250,6 +6253,8 @@ enabled vapoursynth && require_pkg_config vapoursynth "vapoursynth-script enabled libinference_engine && require_pkg_config libinference_engine dldt "ie_api_wrapper.h" IESizeOfContext +enabled librdkafka && require_pkg_config librdkafka rdkafka "librdkafka/rdkafka.h" rd_kafka_version + if enabled gcrypt; then GCRYPT_CONFIG="${cross_prefix}libgcrypt-config" if "${GCRYPT_CONFIG}" --version > /dev/null 2>&1; then diff --git a/libavformat/Makefile b/libavformat/Makefile index f7e1698..5b1f3ad 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -626,6 +626,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o +OBJS-$(CONFIG_RDKAFKA_PROTOCOL) += kafkaproto.o # libavdevice dependencies OBJS-$(CONFIG_IEC61883_INDEV) += dv.o diff --git a/libavformat/kafkaproto.c b/libavformat/kafkaproto.c new file mode 100644 index 0000000..526260a --- /dev/null +++ b/libavformat/kafkaproto.c @@ -0,0 +1,186 @@ +/* + * Kafka network protocol + * Copyright (c) 2019 Shaofei Wang + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file + * Kafka protocol based producer + */ + +#include "avformat.h" +#include "url.h" + +#include "librdkafka/rdkafka.h" + +typedef struct KafkaContext { + rd_kafka_t *rk; /* Producer instance handle */ + rd_kafka_conf_t *conf; /* Configuration object */ + rd_kafka_topic_t *rkt; /* Topic object */ +} KafkaContext; + +static void dr_msg_cb (rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) + fprintf(stderr, "%% Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + /* Avoid too much print mesg + else + fprintf(stderr, "%% Message delivered (%zd bytes, " + "partition %"PRId32")\n", + rkmessage->len, rkmessage->partition); + */ +} + +static int kafka_open(URLContext *s, const char *uri, int flags, AVDictionary **opts) +{ + KafkaContext *kc = s->priv_data; + + char proto[8], hostname[256], path[1024], auth[100], brokers[256], errstr[512], *topic; + int port; + + av_url_split(proto, sizeof(proto), auth, sizeof(auth), + hostname, sizeof(hostname), &port, + path, sizeof(path), s->filename); + topic = strrchr(s->filename, '/') + 1; + + kc->conf = rd_kafka_conf_new(); + port ? sprintf(brokers, "%s:%d", hostname, port) + : sprintf(brokers, "%s:9092", hostname); + + /* Set bootstrap broker(s) as a comma-separated list of + * host or host:port (default port 9092). + * librdkafka will use the bootstrap brokers to acquire the full + * set of brokers from the cluster. */ + if (rd_kafka_conf_set(kc->conf, "bootstrap.servers", brokers, + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + av_log(s, AV_LOG_ERROR, "%s\n", errstr); + return AVERROR_UNKNOWN; + } + + rd_kafka_conf_set_dr_msg_cb(kc->conf, dr_msg_cb); + + kc->rk = rd_kafka_new(RD_KAFKA_PRODUCER, kc->conf, errstr, sizeof(errstr)); + if (!kc->rk) { + av_log(s, AV_LOG_ERROR, + "%% Failed to create new producer: %s\n", errstr); + return AVERROR_UNKNOWN; + } + + kc->rkt = rd_kafka_topic_new(kc->rk, topic, NULL); + if (!kc->rkt) { + av_log(s, AV_LOG_ERROR, + "%% Failed to create topic object: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + rd_kafka_destroy(kc->rk); + return AVERROR_UNKNOWN; + } + + return 0; +} + +static int kafka_close(URLContext *h) +{ + KafkaContext *kc = h->priv_data; + + rd_kafka_flush(kc->rk, 10*1000 /* wait for max 10 seconds */); + + rd_kafka_topic_destroy(kc->rkt); + + rd_kafka_destroy(kc->rk); + + return 0; +} + +static int kafka_write(URLContext *s, const uint8_t *buf, int size) +{ + KafkaContext *kc = s->priv_data; + rd_kafka_t *rk = kc->rk; + rd_kafka_topic_t *rkt = kc->rkt; + + if (size == 0) { + /* Empty line: only serve delivery reports */ + rd_kafka_poll(rk, 0/*non-blocking */); + return 0; + } + +retry: + if (rd_kafka_produce( + /* Topic object */ + rkt, + /* Use builtin partitioner to select partition*/ + RD_KAFKA_PARTITION_UA, + /* Make a copy of the payload. */ + RD_KAFKA_MSG_F_COPY, + /* Message payload (value) and length */ + buf, size, + /* Optional key and its length */ + NULL, 0, + /* Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + NULL) == -1) { + + /** + * Failed to *enqueue* message for producing. + */ + av_log(s, AV_LOG_ERROR, + "%% Failed to produce to topic %s: %s\n", + rd_kafka_topic_name(rkt), + rd_kafka_err2str(rd_kafka_last_error())); + + /* Poll to handle delivery reports */ + if (rd_kafka_last_error() == + RD_KAFKA_RESP_ERR__QUEUE_FULL) { + /* If the internal queue is full, wait for + * messages to be delivered and then retry. + * The internal queue represents both + * messages to be sent and messages that have + * been sent or failed, awaiting their + * delivery report callback to be called. + * + * The internal queue is limited by the + * configuration property + * queue.buffering.max.messages */ + rd_kafka_poll(rk, 1000/*block for max 1000ms*/); + goto retry; + } + } else { + rd_kafka_poll(rk, 0/*non-blocking*/); + } + return size; +} + +#define KAFKA_PROTOCOL(flavor) \ +static const AVClass flavor##_class = { \ + .class_name = #flavor, \ + .item_name = av_default_item_name, \ + .version = LIBAVUTIL_VERSION_INT, \ +}; \ + \ +const URLProtocol ff_##flavor##_protocol = { \ + .name = "kafka", \ + .url_open = kafka_open, \ + .url_write = kafka_write, \ + .url_close = kafka_close, \ + .priv_data_size = sizeof(KafkaContext), \ + .flags = URL_PROTOCOL_FLAG_NETWORK, \ +}; + +KAFKA_PROTOCOL(rdkafka) diff --git a/libavformat/protocols.c b/libavformat/protocols.c index ad95659..d95bfdf 100644 --- a/libavformat/protocols.c +++ b/libavformat/protocols.c @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol; extern const URLProtocol ff_libsrt_protocol; extern const URLProtocol ff_libssh_protocol; extern const URLProtocol ff_libsmbclient_protocol; +extern const URLProtocol ff_rdkafka_protocol; #include "libavformat/protocol_list.c" -- 2.7.4