summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndriy Gelman <andriy.gelman@gmail.com>2019-07-30 14:39:32 -0400
committerMarton Balint <cus@passwd.hu>2019-09-02 23:08:43 +0200
commitef43a4d6b38de941dd2ede0711d4fd5d811127ed (patch)
treecd9bd40e04c73631c873560139862f7d070186d0
parentb022d9ba288ad3de321e0835b0aedfd91c2c3064 (diff)
downloadffmpeg-streaming-ef43a4d6b38de941dd2ede0711d4fd5d811127ed.zip
ffmpeg-streaming-ef43a4d6b38de941dd2ede0711d4fd5d811127ed.tar.gz
avformat: Add ZeroMQ as a protocol
When ffmpeg was streaming, multiple clients were only supported by using a multicast destination address. An alternative was to stream to a server which re-distributes the content. This commit adds ZeroMQ as a protocol, which allows multiple clients to connect to a single ffmpeg instance. Signed-off-by: Marton Balint <cus@passwd.hu>
-rw-r--r--Changelog1
-rwxr-xr-xconfigure4
-rw-r--r--doc/general.texi1
-rw-r--r--doc/protocols.texi47
-rw-r--r--libavformat/Makefile1
-rw-r--r--libavformat/libzmq.c199
-rw-r--r--libavformat/protocols.c1
-rw-r--r--libavformat/version.h2
8 files changed, 254 insertions, 2 deletions
diff --git a/Changelog b/Changelog
index 20e4296..4b29e01 100644
--- a/Changelog
+++ b/Changelog
@@ -8,6 +8,7 @@ version <next>:
- support for TrueHD in mp4
- Supoort AMD AMF encoder on Linux (via Vulkan)
- IMM5 video decoder
+- ZeroMQ protocol
version 4.2:
diff --git a/configure b/configure
index 9d70375..4c77e1c 100755
--- a/configure
+++ b/configure
@@ -3415,6 +3415,8 @@ libsrt_protocol_deps="libsrt"
libsrt_protocol_select="network"
libssh_protocol_deps="libssh"
libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
# filters
afftdn_filter_deps="avcodec"
@@ -6325,7 +6327,7 @@ enabled libxavs && require libxavs "stdint.h xavs.h" xavs_encoder_enco
enabled libxavs2 && require_pkg_config libxavs2 "xavs2 >= 1.3.0" "stdint.h xavs2.h" xavs2_api_get
enabled libxvid && require libxvid xvid.h xvid_global -lxvidcore
enabled libzimg && require_pkg_config libzimg "zimg >= 2.7.0" zimg.h zimg_get_api_version
-enabled libzmq && require_pkg_config libzmq libzmq zmq.h zmq_ctx_new
+enabled libzmq && require_pkg_config libzmq "libzmq >= 4.2.1" zmq.h zmq_ctx_new
enabled libzvbi && require_pkg_config libzvbi zvbi-0.2 libzvbi.h vbi_decoder_new &&
{ test_cpp_condition libzvbi.h "VBI_VERSION_MAJOR > 0 || VBI_VERSION_MINOR > 2 || VBI_VERSION_MINOR == 2 && VBI_VERSION_MICRO >= 28" ||
enabled gpl || die "ERROR: libzvbi requires version 0.2.28 or --enable-gpl."; }
diff --git a/doc/general.texi b/doc/general.texi
index d0c3525..2744c23 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1339,6 +1339,7 @@ performance on systems without hardware floating point support).
@item TCP @tab X
@item TLS @tab X
@item UDP @tab X
+@item ZMQ @tab E
@end multitable
@code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af..b03432e 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,51 @@ Timeout in ms.
Create the Unix socket in listening mode.
@end table
+@section zmq
+
+ZeroMQ asynchronous messaging using the libzmq library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://ip-address:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect to the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+Streaming to multiple clients is implemented using a ZeroMQ Pub-Sub pattern.
+The server side binds to a port and publishes data. Clients connect to the
+server (via IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol.
+
+Options can be set on the @command{ffmpeg}/@command{ffplay} command
+line. The following options are supported:
+
+@table @option
+
+@item pkt_size
+Forces the maximum packet size for sending/receiving data. The default value is
+32,768 bytes. On the server side, this sets the maximum size of sent packets
+via ZeroMQ. On the clients, it sets an internal buffer size for receiving
+packets. Note that pkt_size on the clients should be equal to or greater than
+pkt_size on the server. Otherwise the received message may be truncated causing
+decoding errors.
+
+@end table
+
+
@c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b00..efa3a11 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o
OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o
OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o
OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o
# libavdevice dependencies
OBJS-$(CONFIG_IEC61883_INDEV) += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000..d864882
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,199 @@
+/*
+ * ZeroMQ Protocol
+ * Copyright (c) 2019 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+
+#define ZMQ_STRERROR zmq_strerror(zmq_errno())
+
+typedef struct ZMQContext {
+ const AVClass *class;
+ void *context;
+ void *socket;
+ int pkt_size;
+ int pkt_size_overflow; /*keep track of the largest packet during overflow*/
+} ZMQContext;
+
+#define OFFSET(x) offsetof(ZMQContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+ { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E },
+ { NULL }
+};
+
+static int zmq_proto_wait(URLContext *h, void *socket, int write)
+{
+ int ret;
+ int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
+ zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
+ ret = zmq_poll(&items, 1, POLLING_TIME);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
+ return AVERROR_EXTERNAL;
+ }
+ return items.revents & ev ? 0 : AVERROR(EAGAIN);
+}
+
+static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
+{
+ int ret;
+ int64_t wait_start = 0;
+
+ while (1) {
+ if (ff_check_interrupt(int_cb))
+ return AVERROR_EXIT;
+ ret = zmq_proto_wait(h, socket, write);
+ if (ret != AVERROR(EAGAIN))
+ return ret;
+ if (timeout > 0) {
+ if (!wait_start)
+ wait_start = av_gettime_relative();
+ else if (av_gettime_relative() - wait_start > timeout)
+ return AVERROR(ETIMEDOUT);
+ }
+ }
+}
+
+static int zmq_proto_open(URLContext *h, const char *uri, int flags)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+ s->pkt_size_overflow = 0;
+ h->is_streamed = 1;
+
+ if (s->pkt_size > 0)
+ h->max_packet_size = s->pkt_size;
+
+ s->context = zmq_ctx_new();
+ if (!s->context) {
+ /*errno not set on failure during zmq_ctx_new()*/
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
+ return AVERROR_EXTERNAL;
+ }
+
+ av_strstart(uri, "zmq:", &uri);
+
+ /*publish during write*/
+ if (h->flags & AVIO_FLAG_WRITE) {
+ s->socket = zmq_socket(s->context, ZMQ_PUB);
+ if (!s->socket) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+ zmq_ctx_term(s->context);
+ return AVERROR_EXTERNAL;
+ }
+
+ ret = zmq_bind(s->socket, uri);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
+ zmq_close(s->socket);
+ zmq_ctx_term(s->context);
+ return AVERROR_EXTERNAL;
+ }
+ }
+
+ /*subscribe for read*/
+ if (h->flags & AVIO_FLAG_READ) {
+ s->socket = zmq_socket(s->context, ZMQ_SUB);
+ if (!s->socket) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+ zmq_ctx_term(s->context);
+ return AVERROR_EXTERNAL;
+ }
+
+ zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+ ret = zmq_connect(s->socket, uri);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
+ zmq_close(s->socket);
+ zmq_ctx_term(s->context);
+ return AVERROR_EXTERNAL;
+ }
+ }
+ return 0;
+}
+
+static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+
+ ret = zmq_proto_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback);
+ if (ret)
+ return ret;
+ ret = zmq_send(s->socket, buf, size, 0);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
+ return AVERROR_EXTERNAL;
+ }
+ return ret; /*number of bytes sent*/
+}
+
+static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+
+ ret = zmq_proto_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback);
+ if (ret)
+ return ret;
+ ret = zmq_recv(s->socket, buf, size, 0);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
+ return AVERROR_EXTERNAL;
+ }
+ if (ret > size) {
+ s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
+ av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
+ ret = size;
+ }
+ return ret; /*number of bytes read*/
+}
+
+static int zmq_proto_close(URLContext *h)
+{
+ ZMQContext *s = h->priv_data;
+ zmq_close(s->socket);
+ zmq_ctx_term(s->context);
+ return 0;
+}
+
+static const AVClass zmq_context_class = {
+ .class_name = "zmq",
+ .item_name = av_default_item_name,
+ .option = options,
+ .version = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+ .name = "zmq",
+ .url_close = zmq_proto_close,
+ .url_open = zmq_proto_open,
+ .url_read = zmq_proto_read,
+ .url_write = zmq_proto_write,
+ .priv_data_size = sizeof(ZMQContext),
+ .priv_data_class = &zmq_context_class,
+ .flags = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659..face5b2 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
extern const URLProtocol ff_libsrt_protocol;
extern const URLProtocol ff_libssh_protocol;
extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
#include "libavformat/protocol_list.c"
diff --git a/libavformat/version.h b/libavformat/version.h
index af0db1e..edfa73f 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,7 +32,7 @@
// Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
// Also please add any ticket numbers that you believe might be affected here
#define LIBAVFORMAT_VERSION_MAJOR 58
-#define LIBAVFORMAT_VERSION_MINOR 31
+#define LIBAVFORMAT_VERSION_MINOR 32
#define LIBAVFORMAT_VERSION_MICRO 104
#define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
OpenPOWER on IntegriCloud