From 58ee09911e0dab6432b18e9a2c5a876fdf8ef078 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Storsj=C3=B6?= Date: Fri, 1 Oct 2010 17:50:24 +0000 Subject: rtpdec: Reorder received RTP packets according to the seq number Reordering is enabled only when receiving over UDP. Originally committed as revision 25294 to svn://svn.ffmpeg.org/ffmpeg/trunk --- libavformat/rtpdec.c | 120 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 115 insertions(+), 5 deletions(-) (limited to 'libavformat/rtpdec.c') diff --git a/libavformat/rtpdec.c b/libavformat/rtpdec.c index 33497e6..d349ce7 100644 --- a/libavformat/rtpdec.c +++ b/libavformat/rtpdec.c @@ -334,7 +334,7 @@ void rtp_send_punch_packets(URLContext* rtp_handle) * MPEG2TS streams to indicate that they should be demuxed inside the * rtp demux (otherwise CODEC_ID_MPEG2TS packets are returned) */ -RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type) +RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type, int queue_size) { RTPDemuxContext *s; @@ -346,6 +346,7 @@ RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *r s->first_rtcp_ntp_time = AV_NOPTS_VALUE; s->ic = s1; s->st = st; + s->queue_size = queue_size; rtp_init_statistics(&s->statistics, 0); // do we know the initial sequence from sdp? if (!strcmp(ff_rtp_enc_name(payload_type), "MP2T")) { s->ts = ff_mpegts_parse_open(s->ic); @@ -504,9 +505,84 @@ static int rtp_parse_packet_internal(RTPDemuxContext *s, AVPacket *pkt, // now perform timestamp things.... finalize_packet(s, pkt, timestamp); + s->prev_ret = rv; return rv; } +void ff_rtp_reset_packet_queue(RTPDemuxContext *s) +{ + while (s->queue) { + RTPPacket *next = s->queue->next; + av_free(s->queue->buf); + av_free(s->queue); + s->queue = next; + } + s->seq = 0; + s->queue_len = 0; + s->prev_ret = 0; +} + +static void enqueue_packet(RTPDemuxContext *s, uint8_t *buf, int len) +{ + uint16_t seq = AV_RB16(buf + 2); + RTPPacket *cur = s->queue, *prev = NULL, *packet; + + /* Find the correct place in the queue to insert the packet */ + while (cur) { + int16_t diff = seq - cur->seq; + if (diff < 0) + break; + prev = cur; + cur = cur->next; + } + + packet = av_mallocz(sizeof(*packet)); + if (!packet) + return; + packet->recvtime = av_gettime(); + packet->seq = seq; + packet->len = len; + packet->buf = buf; + packet->next = cur; + if (prev) + prev->next = packet; + else + s->queue = packet; + s->queue_len++; +} + +static int has_next_packet(RTPDemuxContext *s) +{ + return s->queue && s->queue->seq == s->seq + 1; +} + +int64_t ff_rtp_queued_packet_time(RTPDemuxContext *s) +{ + return s->queue ? s->queue->recvtime : 0; +} + +static int rtp_parse_queued_packet(RTPDemuxContext *s, AVPacket *pkt) +{ + int rv; + RTPPacket *next; + + if (s->queue_len <= 0) + return -1; + + if (!has_next_packet(s)) + av_log(s->st ? s->st->codec : NULL, AV_LOG_WARNING, + "RTP: missed %d packets\n", s->queue->seq - s->seq - 1); + + /* Parse the first packet in the queue, and dequeue it */ + rv = rtp_parse_packet_internal(s, pkt, s->queue->buf, s->queue->len); + next = s->queue->next; + av_free(s->queue->buf); + av_free(s->queue); + s->queue = next; + s->queue_len--; + return rv ? rv : has_next_packet(s); +} + /** * Parse an RTP or RTCP packet directly sent as a buffer. * @param s RTP parse context. @@ -525,6 +601,11 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, int rv= 0; if (!buf) { + /* If parsing of the previous packet actually returned 0, there's + * nothing more to be parsed from that packet, but we may have + * indicated that we can return the next enqueued packet. */ + if (!s->prev_ret) + return rtp_parse_queued_packet(s, pkt); /* return the next packets, if any */ if(s->st && s->parse_packet) { /* timestamp should be overwritten by parse_packet, if not, @@ -533,7 +614,8 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, rv= s->parse_packet(s->ic, s->dynamic_protocol_context, s->st, pkt, ×tamp, NULL, 0, flags); finalize_packet(s, pkt, timestamp); - return rv; + s->prev_ret = rv; + return rv ? rv : has_next_packet(s); } else { // TODO: Move to a dynamic packet handler (like above) if (s->read_buf_index >= s->read_buf_size) @@ -545,8 +627,10 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, s->read_buf_index += ret; if (s->read_buf_index < s->read_buf_size) return 1; - else - return 0; + else { + s->prev_ret = 0; + return has_next_packet(s); + } } } @@ -559,11 +643,37 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, return rtcp_parse_packet(s, buf, len); } - return rtp_parse_packet_internal(s, pkt, buf, len); + if (s->seq == 0 || s->queue_size <= 1) { + /* First packet, or no reordering */ + return rtp_parse_packet_internal(s, pkt, buf, len); + } else { + uint16_t seq = AV_RB16(buf + 2); + int16_t diff = seq - s->seq; + if (diff < 0) { + /* Packet older than the previously emitted one, drop */ + av_log(s->st ? s->st->codec : NULL, AV_LOG_WARNING, + "RTP: dropping old packet received too late\n"); + return -1; + } else if (diff <= 1) { + /* Correct packet */ + rv = rtp_parse_packet_internal(s, pkt, buf, len); + return rv ? rv : has_next_packet(s); + } else { + /* Still missing some packet, enqueue this one. */ + enqueue_packet(s, buf, len); + *bufptr = NULL; + /* Return the first enqueued packet if the queue is full, + * even if we're missing something */ + if (s->queue_len >= s->queue_size) + return rtp_parse_queued_packet(s, pkt); + return -1; + } + } } void rtp_parse_close(RTPDemuxContext *s) { + ff_rtp_reset_packet_queue(s); if (!strcmp(ff_rtp_enc_name(s->payload_type), "MP2T")) { ff_mpegts_parse_close(s->ts); } -- cgit v1.1