diff options
author | Clément Bœsch <clement@stupeflix.com> | 2015-11-30 18:18:59 +0100 |
---|---|---|
committer | Clément Bœsch <clement@stupeflix.com> | 2015-12-07 11:39:28 +0100 |
commit | a26e4215b91a54a168459a8fa45976c9ae072fbc (patch) | |
tree | 8108987574fe7b44f7d34ca4f0e70c1905816252 /tests/api | |
parent | f98abe0ee778880408863aff8371f4749f1f9b49 (diff) | |
download | ffmpeg-streaming-a26e4215b91a54a168459a8fa45976c9ae072fbc.zip ffmpeg-streaming-a26e4215b91a54a168459a8fa45976c9ae072fbc.tar.gz |
fate/api: test threadmessage
Diffstat (limited to 'tests/api')
-rw-r--r-- | tests/api/Makefile | 1 | ||||
-rw-r--r-- | tests/api/api-threadmessage-test.c | 261 |
2 files changed, 262 insertions, 0 deletions
diff --git a/tests/api/Makefile b/tests/api/Makefile index a6d8e42..d83ac24 100644 --- a/tests/api/Makefile +++ b/tests/api/Makefile @@ -3,6 +3,7 @@ APITESTPROGS-$(call DEMDEC, H264, H264) += api-h264 APITESTPROGS-yes += api-seek APITESTPROGS-yes += api-codec-param APITESTPROGS-$(call DEMDEC, H263, H263) += api-band +APITESTPROGS-$(HAVE_PTHREADS) += api-threadmessage APITESTPROGS += $(APITESTPROGS-yes) APITESTOBJS := $(APITESTOBJS:%=$(APITESTSDIR)%) $(APITESTPROGS:%=$(APITESTSDIR)/%-test.o) diff --git a/tests/api/api-threadmessage-test.c b/tests/api/api-threadmessage-test.c new file mode 100644 index 0000000..c78af2a --- /dev/null +++ b/tests/api/api-threadmessage-test.c @@ -0,0 +1,261 @@ +/* + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +/** + * Thread message API test + */ + +#include <pthread.h> + +#include "libavutil/avassert.h" +#include "libavutil/avstring.h" +#include "libavutil/frame.h" +#include "libavutil/threadmessage.h" + +struct sender_data { + int id; + pthread_t tid; + int workload; + AVThreadMessageQueue *queue; +}; + +/* same as sender_data but shuffled for testing purpose */ +struct receiver_data { + pthread_t tid; + int workload; + int id; + AVThreadMessageQueue *queue; +}; + +struct message { + AVFrame *frame; + // we add some junk in the message to make sure the message size is > + // sizeof(void*) + int magic; +}; + +#define MAGIC 0xdeadc0de + +static void free_frame(void *arg) +{ + struct message *msg = arg; + av_assert0(msg->magic == MAGIC); + av_frame_free(&msg->frame); +} + +static void *sender_thread(void *arg) +{ + int i, ret = 0; + struct sender_data *wd = arg; + + av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); + for (i = 0; i < wd->workload; i++) { + if (rand() % wd->workload < wd->workload / 10) { + av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); + av_thread_message_flush(wd->queue); + } else { + char *val; + AVDictionary *meta = NULL; + struct message msg = { + .magic = MAGIC, + .frame = av_frame_alloc(), + }; + + if (!msg.frame) { + ret = AVERROR(ENOMEM); + break; + } + + /* we add some metadata to identify the frames */ + val = av_asprintf("frame %d/%d from sender %d", + i + 1, wd->workload, wd->id); + if (!val) { + av_frame_free(&msg.frame); + ret = AVERROR(ENOMEM); + break; + } + ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); + if (ret < 0) { + av_frame_free(&msg.frame); + break; + } + av_frame_set_metadata(msg.frame, meta); + + /* allocate a real frame in order to simulate "real" work */ + msg.frame->format = AV_PIX_FMT_RGBA; + msg.frame->width = 320; + msg.frame->height = 240; + ret = av_frame_get_buffer(msg.frame, 32); + if (ret < 0) { + av_frame_free(&msg.frame); + break; + } + + /* push the frame in the common queue */ + av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", + wd->id, i + 1, wd->workload, msg.frame); + ret = av_thread_message_queue_send(wd->queue, &msg, 0); + if (ret < 0) { + av_frame_free(&msg.frame); + break; + } + } + } + av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", + wd->id, av_err2str(ret)); + av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); + return NULL; +} + +static void *receiver_thread(void *arg) +{ + int i, ret = 0; + struct receiver_data *rd = arg; + + for (i = 0; i < rd->workload; i++) { + if (rand() % rd->workload < rd->workload / 10) { + av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue\n", rd->id); + av_thread_message_flush(rd->queue); + } else { + struct message msg; + AVDictionary *meta; + AVDictionaryEntry *e; + + ret = av_thread_message_queue_recv(rd->queue, &msg, 0); + if (ret < 0) + break; + av_assert0(msg.magic == MAGIC); + meta = av_frame_get_metadata(msg.frame); + e = av_dict_get(meta, "sig", NULL, 0); + av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); + av_frame_free(&msg.frame); + } + } + + av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); + av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); + + return NULL; +} + +static int get_workload(int minv, int maxv) +{ + return maxv == minv ? maxv : rand() % (maxv - minv) + minv; +} + +int main(int ac, char **av) +{ + int i, ret = 0; + int max_queue_size; + int nb_senders, sender_min_load, sender_max_load; + int nb_receivers, receiver_min_load, receiver_max_load; + struct sender_data *senders; + struct receiver_data *receivers; + AVThreadMessageQueue *queue = NULL; + + if (ac != 8) { + av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " + "<nb_senders> <sender_min_send> <sender_max_send> " + "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); + return 1; + } + + max_queue_size = atoi(av[1]); + nb_senders = atoi(av[2]); + sender_min_load = atoi(av[3]); + sender_max_load = atoi(av[4]); + nb_receivers = atoi(av[5]); + receiver_min_load = atoi(av[6]); + receiver_max_load = atoi(av[7]); + + if (max_queue_size <= 0 || + nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || + nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { + av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); + return 1; + } + + av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " + "%d receivers receiving [%d-%d]\n", max_queue_size, + nb_senders, sender_min_load, sender_max_load, + nb_receivers, receiver_min_load, receiver_max_load); + + senders = av_mallocz_array(nb_senders, sizeof(*senders)); + receivers = av_mallocz_array(nb_receivers, sizeof(*receivers)); + if (!senders || !receivers) { + ret = AVERROR(ENOMEM); + goto end; + } + + ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); + if (ret < 0) + goto end; + + av_thread_message_queue_set_free_func(queue, free_frame); + +#define SPAWN_THREADS(type) do { \ + for (i = 0; i < nb_##type##s; i++) { \ + struct type##_data *td = &type##s[i]; \ + \ + td->id = i; \ + td->queue = queue; \ + td->workload = get_workload(type##_min_load, type##_max_load); \ + \ + ret = pthread_create(&td->tid, NULL, type##_thread, td); \ + if (ret) { \ + const int err = AVERROR(ret); \ + av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ + " thread: %s\n", av_err2str(err)); \ + goto end; \ + } \ + } \ +} while (0) + +#define WAIT_THREADS(type) do { \ + for (i = 0; i < nb_##type##s; i++) { \ + struct type##_data *td = &type##s[i]; \ + \ + ret = pthread_join(td->tid, NULL); \ + if (ret) { \ + const int err = AVERROR(ret); \ + av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ + " thread: %s\n", av_err2str(err)); \ + goto end; \ + } \ + } \ +} while (0) + + SPAWN_THREADS(receiver); + SPAWN_THREADS(sender); + + WAIT_THREADS(sender); + WAIT_THREADS(receiver); + +end: + av_thread_message_queue_free(&queue); + av_freep(&senders); + av_freep(&receivers); + + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); + return 1; + } + return 0; +} |