Google Pixel、Pixel XL 内核代码(文章基于 Kernel-3.18): Kernel source for Pixel and Pixel XL - GitHub
AOSP 源码(文章基于 Android 7.1.2): Android 系统全套源代码分享 (更新到 8.1.0_r1)
(一)、基于NuPlayer的HLS流媒体协议 1.1、HLS 概述 HTTP Live Streaming(HLS)是苹果公司实现的基于HTTP的流媒体直播和点播协议,主要应用在iOS系统。相对于普通的流媒体,例如RTMP协议、RTSP协议、MMS协议等,HLS最大的优点是可以根据网络状况自动切换到不同码率的视频,如果网络状况较好,则会切换到高码率的视频,若发现网络状况不佳,则会逐渐过渡到低码率的视频,这个我们下面将会结合代码对其进行说明。
1.2、HLS框架介绍 我们接下来看下HLS系统的整体结构图:
M3U8 知识请参考:HLS之m3u8、ts流格式详解
1.3、HLS播放流程 1、获取不同带宽下对应的网络资源URI及音视频编解码,视频分辨率等信息的文件
1 2 #EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=899152,RESOLUTION=480x270,CODECS="avc1.4d4015,mp4a.40.5" http:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:10 #EXT-X-MEDIA-SEQUENCE:6532 #EXT-X-KEY:METHOD=AES-128,URI="18319965201.key" #EXTINF:10, 20125484 T125708-01 -6533. ts#EXT-X-KEY:METHOD=AES-128,URI="14319965205.key" #EXTINF:10, 20125484 T125708-01 -6534. ts.... #EXTINF:8, 20140804 T125708-01 -6593. ts
5、请求下载某一个分片 6、根据当前的带宽决定是否切换视频资源 7、将下载的分片资源解密后送到解码器进行解码
关于NuPlayerDrvier的创建以及SetDataSource的流程和Stagefight Player大体一致,区别在于setDataSource的时候是根据url的不同会创建三种不同的DataSource:HttpLiveSource,RTSPSource,以及GenericSource。这里就不做大篇幅的介绍了,就直接上图吧
(二)、基于NuPlayer的HLS流媒体播放源码分析 首先看看总体时序图:
2.1、HTTPLiveSource::prepareAsync() 我们直接从prepare结合HLS原理开始分析,直接看HttpliveSource的prepareAsync。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 [->\frameworks\av\media\libmediaplayerservice\nuplayer\HTTPLiveSource.cpp] void NuPlayer::HTTPLiveSource::prepareAsync() { if (mLiveLooper == NULL ) { mLiveLooper = new ALooper; mLiveLooper->setName("http live" ); mLiveLooper->start(); mLiveLooper->registerHandler(this ); } sp<AMessage> notify = new AMessage(kWhatSessionNotify, this ); mLiveSession = new LiveSession( notify, (mFlags & kFlagIncognito) ? LiveSession::kFlagIncognito : 0 , mHTTPService); mLiveLooper->registerHandler(mLiveSession); mLiveSession->connectAsync(mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] void LiveSession::connectAsync (const char *url, const KeyedVector<String8, String8> *headers) { sp<AMessage> msg = new AMessage(kWhatConnect, this ); msg->setString("url" , url); if (headers != NULL ) { msg->setPointer("headers" ,new KeyedVector<String8, String8>(*headers)); } msg->post(); } void LiveSession::onMessageReceived (const sp<AMessage> &msg) { case kWhatConnect: { onConnect(msg); break ; } } void LiveSession::onConnect (const sp<AMessage> &msg) { CHECK(msg->findString("url" , &mMasterURL)); KeyedVector<String8, String8> *headers = NULL ; if (!msg->findPointer("headers" , (void **)&headers)) { mExtraHeaders.clear(); } else { mExtraHeaders = *headers; delete headers; headers = NULL ; } if (mFetcherLooper == NULL ) { mFetcherLooper = new ALooper(); mFetcherLooper->setName("Fetcher" ); mFetcherLooper->start(false , false ); } addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] sp<PlaylistFetcher> LiveSession::addFetcher (const char *uri) { ssize_t index = mFetcherInfos.indexOfKey(uri); sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this ); notify->setString("uri" , uri); notify->setInt32("switchGeneration" , mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this , uri, mCurBandwidthIndex, mSubtitleGeneration); info.mDurationUs = -1l l; info.mToBeRemoved = false ; info.mToBeResumed = false ; mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); return info.mFetcher; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 [->\frameworks\av\media\libstagefright\httplive\PlaylistFetcher.cpp] void PlaylistFetcher::fetchPlaylistAsync () { (new AMessage(kWhatFetchPlaylist, this ))->post(); } void PlaylistFetcher::onMessageReceived (const sp<AMessage> &msg) { case kWhatFetchPlaylist: { bool unchanged; sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(mURI.c_str(), NULL , &unchanged); sp<AMessage> notify = mNotify->dup(); notify->setInt32("what" , kWhatPlaylistFetched); notify->setObject("playlist" , playlist); notify->post(); break ; } }
我们接下来看下fetchFile过程:首先会通过fetchFile从服务器端获取到m3u8 playlist内容存放到buffer缓存区,然后将获取到的缓存数据包装成M3UParser
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [->\frameworks\av\media\libstagefright\httplive\HTTPDownloader.cpp] sp<M3UParser> HTTPDownloader::fetchPlaylist ( const char *url, uint8_t *curPlaylistHash, bool *unchanged) { *unchanged = false ; sp<ABuffer> buffer; String8 actualUrl; ssize_t err = fetchFile(url, &buffer, &actualUrl); mHTTPDataSource->disconnect(); sp<M3UParser> playlist = new M3UParser(actualUrl.string (), buffer->data(), buffer->size()); return playlist; } ssize_t HTTPDownloader::fetchFile ( const char *url, sp<ABuffer> *out, String8 *actualUrl) { ssize_t err = fetchBlock(url, out, 0 , -1 , 0 , actualUrl, true ); mHTTPDataSource->disconnect(); return err; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [->\frameworks\av\media\libstagefright\httplive\M3UParser.cpp] M3UParser::M3UParser( const char *baseURI, const void *data, size_t size) : mInitCheck(NO_INIT), mBaseURI(baseURI), mIsExtM3U(false ), mIsVariantPlaylist(false ), mIsComplete(false ), mIsEvent(false ), mFirstSeqNumber(-1 ), mLastSeqNumber(-1 ), mTargetDurationUs(-1l l), mDiscontinuitySeq(0 ), mDiscontinuityCount(0 ), mSelectedIndex(-1 ) { mInitCheck = parse(data, size); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 [->\frameworks\av\media\libstagefright\httplive\M3UParser.cpp] status_t M3UParser::parse (const void *_data, size_t size) { int32_t lineNo = 0 ; sp<AMessage> itemMeta; const char *data = (const char *)_data; size_t offset = 0 ; uint64_t segmentRangeOffset = 0 ; while (offset < size) { size_t offsetLF = offset; while (offsetLF < size && data[offsetLF] != '\n' ) { ++offsetLF; } AString line; if (offsetLF > offset && data[offsetLF - 1 ] == '\r' ) { line.setTo(&data[offset], offsetLF - offset - 1 ); } else { line.setTo(&data[offset], offsetLF - offset); } if (line.empty()) { offset = offsetLF + 1 ; continue ; } if (lineNo == 0 && line == "#EXTM3U" ) { mIsExtM3U = true ; } if (mIsExtM3U) { status_t err = OK; if (line.startsWith("#EXT-X-TARGETDURATION" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } err = parseMetaData(line, &mMeta, "target-duration" ); } else if (line.startsWith("#EXT-X-MEDIA-SEQUENCE" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } err = parseMetaData(line, &mMeta, "media-sequence" ); } else if (line.startsWith("#EXT-X-KEY" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } err = parseCipherInfo(line, &itemMeta, mBaseURI); } else if (line.startsWith("#EXT-X-ENDLIST" )) { mIsComplete = true ; } else if (line.startsWith("#EXT-X-PLAYLIST-TYPE:EVENT" )) { mIsEvent = true ; } else if (line.startsWith("#EXTINF" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } err = parseMetaDataDuration(line, &itemMeta, "durationUs" ); } else if (line.startsWith("#EXT-X-DISCONTINUITY" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } if (itemMeta == NULL ) { itemMeta = new AMessage; } itemMeta->setInt32("discontinuity" , true ); ++mDiscontinuityCount; } else if (line.startsWith("#EXT-X-STREAM-INF" )) { if (mMeta != NULL ) { return ERROR_MALFORMED; } mIsVariantPlaylist = true ; err = parseStreamInf(line, &itemMeta); } else if (line.startsWith("#EXT-X-BYTERANGE" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } uint64_t length, offset; err = parseByteRange(line, segmentRangeOffset, &length, &offset); if (err == OK) { if (itemMeta == NULL ) { itemMeta = new AMessage; } itemMeta->setInt64("range-offset" , offset); itemMeta->setInt64("range-length" , length); segmentRangeOffset = offset + length; } } else if (line.startsWith("#EXT-X-MEDIA" )) { err = parseMedia(line); } else if (line.startsWith("#EXT-X-DISCONTINUITY-SEQUENCE" )) { if (mIsVariantPlaylist) { return ERROR_MALFORMED; } size_t seq; err = parseDiscontinuitySequence(line, &seq); if (err == OK) { mDiscontinuitySeq = seq; } } if (err != OK) { return err; } } if (!line.startsWith("#" )) { if (!mIsVariantPlaylist) { int64_t durationUs; if (itemMeta == NULL || !itemMeta->findInt64("durationUs" , &durationUs)) { return ERROR_MALFORMED; } itemMeta->setInt32("discontinuity-sequence" , mDiscontinuitySeq + mDiscontinuityCount); } mItems.push(); Item *item = &mItems.editItemAt(mItems.size() - 1 ); CHECK(MakeURL(mBaseURI.c_str(), line.c_str(), &item->mURI)); item->mMeta = itemMeta; itemMeta.clear(); } offset = offsetLF + 1 ; ++lineNo; } if (!mIsVariantPlaylist) { int32_t targetDurationSecs; if (mMeta == NULL || !mMeta->findInt32( "target-duration" , &targetDurationSecs)) { ALOGE("Media playlist missing #EXT-X-TARGETDURATION" ); return ERROR_MALFORMED; } mTargetDurationUs = targetDurationSecs * 1000000l l; mFirstSeqNumber = 0 ; if (mMeta != NULL ) { mMeta->findInt32("media-sequence" , &mFirstSeqNumber); } mLastSeqNumber = mFirstSeqNumber + mItems.size() - 1 ; } return OK; }
1 2 3 4 5 6 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] case PlaylistFetcher::kWhatPlaylistFetched:{ onMasterPlaylistFetched(msg); break ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] void LiveSession::onMasterPlaylistFetched (const sp<AMessage> &msg) { AString uri; CHECK(msg->findString("uri" , &uri)); ssize_t index = mFetcherInfos.indexOfKey(uri); mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); CHECK(msg->findObject("playlist" , (sp<RefBase> *)&mPlaylist)); size_t initialBandwidth = 0 ; size_t initialBandwidthIndex = 0 ; int32_t maxWidth = 0 ; int32_t maxHeight = 0 ; if (mPlaylist->isVariantPlaylist()) { Vector<BandwidthItem> itemsWithVideo; for (size_t i = 0 ; i < mPlaylist->size(); ++i) { BandwidthItem item; item.mPlaylistIndex = i; item.mLastFailureUs = -1l l; sp<AMessage> meta; AString uri; mPlaylist->itemAt(i, &uri, &meta); CHECK(meta->findInt32("bandwidth" , (int32_t *)&item.mBandwidth)); int32_t width, height; if (meta->findInt32("width" , &width)) { maxWidth = max(maxWidth, width); } if (meta->findInt32("height" , &height)) { maxHeight = max(maxHeight, height); } mBandwidthItems.push(item); if (mPlaylist->hasType(i, "video" )) { itemsWithVideo.push(item); } } if (!itemsWithVideo.empty()&& itemsWithVideo.size() < mBandwidthItems.size()) { mBandwidthItems.clear(); for (size_t i = 0 ; i < itemsWithVideo.size(); ++i) { mBandwidthItems.push(itemsWithVideo[i]); } } CHECK_GT(mBandwidthItems.size(), 0u ); initialBandwidth = mBandwidthItems[0 ].mBandwidth; mBandwidthItems.sort(SortByBandwidth); for (size_t i = 0 ; i < mBandwidthItems.size(); ++i) { if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { initialBandwidthIndex = i; break ; } } } else { } mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; mPlaylist->pickRandomMediaItems(); changeConfiguration(0l l , initialBandwidthIndex, false ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] void LiveSession::changeConfiguration (int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { cancelBandwidthSwitch(); mReconfigurationInProgress = true ; if (bandwidthIndex >= 0 ) { mOrigBandwidthIndex = mCurBandwidthIndex; mCurBandwidthIndex = bandwidthIndex; if (mOrigBandwidthIndex != mCurBandwidthIndex) { ALOGI("#### Starting Bandwidth Switch: %zd => %zd" ,mOrigBandwidthIndex, mCurBandwidthIndex); } } CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); uint32_t streamMask = 0 ; uint32_t resumeMask = 0 ; AString URIs[kMaxStreams]; for (size_t i = 0 ; i < kMaxStreams; ++i) { if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { streamMask |= indexToType(i); } } for (size_t i = 0 ; i < mFetcherInfos.size(); ++i) { } sp<AMessage> msg; if (timeUs < 0l l) { msg = new AMessage(kWhatChangeConfiguration3, this ); } else { msg = new AMessage(kWhatChangeConfiguration2, this ); } msg->setInt32("streamMask" , streamMask); msg->setInt32("resumeMask" , resumeMask); msg->setInt32("pickTrack" , pickTrack); msg->setInt64("timeUs" , timeUs); for (size_t i = 0 ; i < kMaxStreams; ++i) { if ((streamMask | resumeMask) & indexToType(i)) { msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); } } mContinuationCounter = mFetcherInfos.size(); mContinuation = msg; if (mContinuationCounter == 0 ) { msg->post(); } } void LiveSession::onChangeConfiguration2 (const sp<AMessage> &msg) { int64_t timeUs; CHECK(msg->findInt64("timeUs" , &timeUs)); if (timeUs >= 0 ) { mLastSeekTimeUs = timeUs; mLastDequeuedTimeUs = timeUs; for (size_t i = 0 ; i < mPacketSources.size(); i++) { sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i); sp<MetaData> format = packetSource->getFormat(); packetSource->clear(); packetSource->setFormat(format); } for (size_t i = 0 ; i < kMaxStreams; ++i) { mStreams[i].reset(); } mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); if (mSeekReplyID != NULL ) { CHECK(mSeekReply != NULL ); mSeekReply->setInt32("err" , OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID.clear(); mSeekReply.clear(); } restartPollBuffering(); } uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask" , (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask" , (int32_t *)&resumeMask)); streamMask |= resumeMask; AString URIs[kMaxStreams]; for (size_t i = 0 ; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { const AString &uriKey = mStreams[i].uriKey(); CHECK(msg->findString(uriKey.c_str(), &URIs[i])); ALOGV("%s = '%s'" , uriKey.c_str(), URIs[i].c_str()); } } uint32_t changedMask = 0 ; for (size_t i = 0 ; i < kMaxStreams && i != kSubtitleIndex; ++i) { if ((mStreamMask & streamMask & indexToType(i)) && !mStreams[i].mUri.empty() && !(URIs[i] == mStreams[i].mUri)) { ALOGV("stream %zu changed: oldURI %s, newURI %s" , i, mStreams[i].mUri.c_str(), URIs[i].c_str()); sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); if (source->getLatestDequeuedMeta() != NULL ) { source->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMATCHANGE, NULL , true ); } } if ((mStreamMask & ~streamMask & indexToType(i))) { changedMask |= indexToType(i); } } sp<AMessage> notify = mNotify->dup(); notify->setInt32("what" , kWhatStreamsChanged); notify->setInt32("changedMask" , changedMask); msg->setWhat(kWhatChangeConfiguration3); msg->setTarget(this ); notify->setMessage("reply" , msg); notify->post(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] void LiveSession::onChangeConfiguration3 (const sp<AMessage> &msg) { mContinuation.clear(); uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask" , (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask" , (int32_t *)&resumeMask)); mNewStreamMask = streamMask | resumeMask; int64_t timeUs; int32_t pickTrack; bool switching = false ; CHECK(msg->findInt64("timeUs" , &timeUs)); CHECK(msg->findInt32("pickTrack" , &pickTrack)); ...... for (size_t i = 0 ; i < kMaxStreams; i++) { ...... fetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], getMetadataSource(sources, mNewStreamMask, switching), startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, startTime.getSegmentTimeUs(), startTime.mSeq, seekMode); } ...... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 [->\frameworks\av\media\libstagefright\httplive\PlaylistFetcher.cpp] void PlaylistFetcher::startAsync ( const sp<AnotherPacketSource> &audioSource, const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, const sp<AnotherPacketSource> &metadataSource, int64_t startTimeUs, int64_t segmentStartTimeUs, int32_t startDiscontinuitySeq, LiveSession::SeekMode seekMode) { sp<AMessage> msg = new AMessage(kWhatStart, this ); ...... msg->setInt32("streamTypeMask" , streamTypeMask); msg->setInt64("startTimeUs" , startTimeUs); msg->setInt64("segmentStartTimeUs" , segmentStartTimeUs); msg->setInt32("startDiscontinuitySeq" , startDiscontinuitySeq); msg->setInt32("seekMode" , seekMode); msg->post(); } status_t PlaylistFetcher::onStart (const sp<AMessage> &msg) {...... postMonitorQueue(); return OK; } void PlaylistFetcher::postMonitorQueue (int64_t delayUs, int64_t minDelayUs) { int64_t maxDelayUs = delayUsToRefreshPlaylist(); sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this ); msg->setInt32("generation" , mMonitorQueueGeneration); msg->post(delayUs); } case kWhatDownloadNext: { int32_t generation; CHECK(msg->findInt32("generation" , &generation)); if (generation != mMonitorQueueGeneration) { break ; } if (msg->what() == kWhatMonitorQueue) { onMonitorQueue(); } else { onDownloadNext(); } break ; } void PlaylistFetcher::onDownloadNext () { AString uri; sp<AMessage> itemMeta; sp<ABuffer> buffer; sp<ABuffer> tsBuffer; int32_t firstSeqNumberInPlaylist = 0 ; int32_t lastSeqNumberInPlaylist = 0 ; bool connectHTTP = true ; bool shouldPause = false ; ssize_t bytesRead; do { int64_t startUs = ALooper::GetNowUs(); bytesRead = mHTTPDownloader->fetchBlock( uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, NULL , connectHTTP); int64_t delayUs = ALooper::GetNowUs() - startUs; ...... } ...... if (bufferStartsWithTsSyncByte(buffer)) { if (tsBuffer == NULL ) { tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); tsBuffer->setRange(0 , 0 ); } else if (tsBuffer->capacity() != buffer->capacity()) { size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); tsBuffer->setRange(tsOff, tsSize); } tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); err = extractAndQueueAccessUnitsFromTs(tsBuffer); } ...... } while (bytesRead != 0 ); ...... bool startUp = mStartup; if (tsBuffer == NULL ) { status_t err = extractAndQueueAccessUnits(buffer, itemMeta); if (err == -EAGAIN) { postMonitorQueue(); return ; } else if (err == ERROR_OUT_OF_RANGE) { notifyStopReached(); return ; } else if (err != OK) { notifyError(err); return ; } } ...... } status_t PlaylistFetcher::extractAndQueueAccessUnits ( const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { if (bufferStartsWithWebVTTMagicSequence(buffer)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles." ); return ERROR_MALFORMED; } const sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); int64_t durationUs; CHECK(itemMeta->findInt64("durationUs" , &durationUs)); buffer->meta()->setInt64("timeUs" , getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs" , durationUs); buffer->meta()->setInt64("segmentStartTimeUs" , getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt32("discontinuitySeq" , mDiscontinuitySeq); buffer->meta()->setInt32("subtitleGeneration" , mSubtitleGeneration); packetSource->queueAccessUnit(buffer); return OK; } ...... size_t offset = 0 ; while (offset < buffer->size()) { const uint8_t *adtsHeader = buffer->data() + offset; CHECK_LT(offset + 5 , buffer->size()); ...... sp<ABuffer> unit = new ABuffer(aac_frame_length); memcpy (unit->data(), adtsHeader, aac_frame_length); unit->meta()->setInt64("timeUs" , unitTimeUs); setAccessUnitProperties(unit, packetSource); packetSource->queueAccessUnit(unit); } return OK; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [->\frameworks\av\media\libstagefright\mpeg2ts\AnotherPacketSource.cpp] void AnotherPacketSource::queueAccessUnit (const sp<ABuffer> &buffer) { int32_t damaged; ...... Mutex::Autolock autoLock (mLock) ; mBuffers.push_back(buffer); mCondition.signal(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity" , &discontinuity)){ ALOGV("queueing a discontinuity with queueAccessUnit" ); mLastQueuedTimeUs = 0l l; mEOSResult = OK; mLatestEnqueuedMeta = NULL ; mDiscontinuitySegments.push_back(DiscontinuitySegment()); return ; } ...... }
2.2、获取数据进行解码 关于解码初始化NuPlayer::instantiateDecoder过程请参考:Android Video System(2):音视频分离MediaExtractor、解码Decoder、渲染Renderer源码分析 我们直接看如何获取数据进行解码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 [->\frameworks\av\media\libstagefright\ACodec.cpp] void ACodec::signalResume () { (new AMessage(kWhatResume, this ))->post(); } case kWhatResume:{ resume(); handled = true ; break ; } void ACodec::ExecutingState::resume() { submitOutputBuffers(); if (mCodec->mBuffers[kPortIndexInput].size() == 0u ) { ALOGW("[%s] we don't have any input buffers to resume" , mCodec->mComponentName.c_str()); } for (size_t i = 0 ; i < mCodec->mBuffers[kPortIndexInput].size(); i++) { BufferInfo *info = &mCodec->mBuffers[kPortIndexInput].editItemAt(i); if (info->mStatus == BufferInfo::OWNED_BY_US) { postFillThisBuffer(info); } } mActive = true ; } void ACodec::BaseState::postFillThisBuffer(BufferInfo *info) { if (mCodec->mPortEOS[kPortIndexInput]) { return ; } CHECK_EQ((int )info->mStatus, (int )BufferInfo::OWNED_BY_US); sp<AMessage> notify = mCodec->mNotify->dup(); notify->setInt32("what" , CodecBase::kWhatFillThisBuffer); notify->setInt32("buffer-id" , info->mBufferID); info->mData->meta()->clear(); notify->setBuffer("buffer" , info->mData); sp<AMessage> reply = new AMessage(kWhatInputBufferFilled, mCodec); reply->setInt32("buffer-id" , info->mBufferID); notify->setMessage("reply" , reply); notify->post(); info->mStatus = BufferInfo::OWNED_BY_UPSTREAM; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 [->\frameworks\av\media\libstagefright\ACodec.cpp] case CodecBase::kWhatFillThisBuffer:{ if (mFlags & kFlagIsAsync) { if (!mHaveInputSurface) { if (mState == FLUSHED) { mHavePendingInputBuffers = true ; } else { onInputBufferAvailable(); } } } else if (mFlags & kFlagDequeueInputPending) { CHECK(handleDequeueInputBuffer(mDequeueInputReplyID)); ++mDequeueInputTimeoutGeneration; mFlags &= ~kFlagDequeueInputPending; mDequeueInputReplyID = 0 ; } else { postActivityNotificationIfPossible(); } break ; } void MediaCodec::onInputBufferAvailable () { int32_t index; while ((index = dequeuePortBuffer(kPortIndexInput)) >= 0 ) { sp<AMessage> msg = mCallback->dup(); msg->setInt32("callbackID" , CB_INPUT_AVAILABLE); msg->setInt32("index" , index); msg->post(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 [->\frameworks\av\media\libmediaplayerservice\nuplayer\NuPlayerDecoder.cpp] void NuPlayer::Decoder::onConfigure(const sp<AMessage> &format) { sp<AMessage> reply = new AMessage(kWhatCodecNotify, this ); mCodec->setCallback(reply); } [->\frameworks\av\media\libstagefright\MediaCodec.cpp] status_t MediaCodec::setCallback (const sp<AMessage> &callback) { sp<AMessage> msg = new AMessage(kWhatSetCallback, this ); msg->setMessage("callback" , callback); sp<AMessage> response; return PostAndAwaitResponse(msg, &response); } case kWhatSetCallback:{ sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp<AMessage> callback; CHECK(msg->findMessage("callback" , &callback)); mCallback = callback; if (mCallback != NULL ) { mFlags |= kFlagIsAsync; } else { mFlags &= ~kFlagIsAsync; } sp<AMessage> response = new AMessage; response->postReply(replyID); break ; }
所以根据上面我们可以知道接下来i调用的是kWhatCodecNotify 下的 CB_INPUT_AVAILABLE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 [->\frameworks\av\media\libmediaplayerservice\nuplayer\NuPlayerDecoder.cpp] case MediaCodec::CB_INPUT_AVAILABLE:{ int32_t index; CHECK(msg->findInt32("index" , &index)); handleAnInputBuffer(index); break ; } bool NuPlayer::Decoder::handleAnInputBuffer(size_t index) { if (isDiscontinuityPending()) { return false ; } sp<ABuffer> buffer; mCodec->getInputBuffer(index, &buffer); if (buffer == NULL ) { handleError(UNKNOWN_ERROR); return false ; } if (index >= mInputBuffers.size()) { for (size_t i = mInputBuffers.size(); i <= index; ++i) { mInputBuffers.add(); mMediaBuffers.add(); mInputBufferIsDequeued.add(); mMediaBuffers.editItemAt(i) = NULL ; mInputBufferIsDequeued.editItemAt(i) = false ; } } mInputBuffers.editItemAt(index) = buffer; if (mMediaBuffers[index] != NULL ) { mMediaBuffers[index]->release(); mMediaBuffers.editItemAt(index) = NULL ; } mInputBufferIsDequeued.editItemAt(index) = true ; if (!mCSDsToSubmit.isEmpty()) { sp<AMessage> msg = new AMessage(); msg->setSize("buffer-ix" , index); sp<ABuffer> buffer = mCSDsToSubmit.itemAt(0 ); ALOGI("[%s] resubmitting CSD" , mComponentName.c_str()); msg->setBuffer("buffer" , buffer); mCSDsToSubmit.removeAt(0 ); CHECK(onInputBufferFetched(msg)); return true ; } while (!mPendingInputMessages.empty()) { sp<AMessage> msg = *mPendingInputMessages.begin(); if (!onInputBufferFetched(msg)) { break ; } mPendingInputMessages.erase(mPendingInputMessages.begin()); } if (!mInputBufferIsDequeued.editItemAt(index)) { return true ; } mDequeuedInputBuffers.push_back(index); onRequestInputBuffers(); return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 [->\frameworks\av\media\libmediaplayerservice\nuplayer\NuPlayerDecoderBase.cpp] void NuPlayer::DecoderBase::onRequestInputBuffers() { if (mRequestInputBuffersPending) { return ; } if (doRequestBuffers()) { mRequestInputBuffersPending = true ; sp<AMessage> msg = new AMessage(kWhatRequestInputBuffers, this ); msg->post(10 * 1000l l); } } [->\frameworks\av\media\libmediaplayerservice\nuplayer\NuPlayerDecoder.cpp] bool NuPlayer::Decoder::doRequestBuffers() { if (isDiscontinuityPending() || mRenderer == NULL ) { return false ; } status_t err = OK; while (err == OK && !mDequeuedInputBuffers.empty()) { size_t bufferIx = *mDequeuedInputBuffers.begin(); sp<AMessage> msg = new AMessage(); msg->setSize("buffer-ix" , bufferIx); err = fetchInputData(msg); if (err != OK && err != ERROR_END_OF_STREAM) { break ; } mDequeuedInputBuffers.erase(mDequeuedInputBuffers.begin()); if (!mPendingInputMessages.empty() || !onInputBufferFetched(msg)) { mPendingInputMessages.push_back(msg); } } return err == -EWOULDBLOCK && mSource->feedMoreTSData() == OK; } status_t NuPlayer::Decoder::fetchInputData(sp<AMessage> &reply) { sp<ABuffer> accessUnit; bool dropAccessUnit; do { status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit); if (err == -EWOULDBLOCK) { return err; } else if (err != OK) { if (err == INFO_DISCONTINUITY) { int32_t type; CHECK(accessUnit->meta()->findInt32("discontinuity" , &type)); bool formatChange = (mIsAudio && (type & ATSParser::DISCONTINUITY_AUDIO_FORMAT)) || (!mIsAudio && (type & ATSParser::DISCONTINUITY_VIDEO_FORMAT)); bool timeChange = (type & ATSParser::DISCONTINUITY_TIME) != 0 ; ALOGI("%s discontinuity (format=%d, time=%d)" , mIsAudio ? "audio" : "video" , formatChange, timeChange); bool seamlessFormatChange = false ; sp<AMessage> newFormat = mSource->getFormat(mIsAudio); if (formatChange) { seamlessFormatChange = supportsSeamlessFormatChange(newFormat); formatChange = !seamlessFormatChange; } if (formatChange ) { mFormatChangePending = true ; err = ERROR_END_OF_STREAM; } else if (timeChange) { rememberCodecSpecificData(newFormat); mTimeChangePending = true ; err = ERROR_END_OF_STREAM; } else if (seamlessFormatChange) { rememberCodecSpecificData(newFormat); continue ; } else { return -EWOULDBLOCK; } } CHECK(err != OK); reply->setInt32("err" , err); return ERROR_END_OF_STREAM; } dropAccessUnit = false ; if (!mIsAudio && !mIsSecure && mRenderer->getVideoLateByUs() > 100000l l && mIsVideoAVC && !IsAVCReferenceFrame(accessUnit)) { dropAccessUnit = true ; ++mNumInputFramesDropped; } } while (dropAccessUnit); #if 0 int64_t mediaTimeUs; CHECK(accessUnit->meta()->findInt64("timeUs" , &mediaTimeUs)); ALOGV("[%s] feeding input buffer at media time %.3f" , mIsAudio ? "audio" : "video" , mediaTimeUs / 1E6 ); #endif if (mCCDecoder != NULL ) { mCCDecoder->decode(accessUnit); } reply->setBuffer("buffer" , accessUnit); return OK; }
2.3、循环获取数据HTTPLiveSource::dequeueAccessUnit() 1 2 3 4 5 6 7 8 9 10 [->\frameworks\av\media\libstagefright\httplive\LiveSession.cpp] status_t LiveSession::dequeueAccessUnit ( StreamType stream, sp<ABuffer> *accessUnit) { status_t finalResult = OK; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); ...... status_t err = packetSource->dequeueAccessUnit(accessUnit); ...... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 [->\frameworks\av\media\libstagefright\mpeg2ts\AnotherPacketSource.cpp] status_t AnotherPacketSource::dequeueAccessUnit (sp<ABuffer> *buffer) { buffer->clear(); Mutex::Autolock autoLock (mLock) ; while (mEOSResult == OK && mBuffers.empty()) { mCondition.wait(mLock); } if (!mBuffers.empty()) { *buffer = *mBuffers.begin(); mBuffers.erase(mBuffers.begin()); int32_t discontinuity; if ((*buffer)->meta()->findInt32("discontinuity" , &discontinuity)) { if (wasFormatChange(discontinuity)) { mFormat.clear(); } mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); return INFO_DISCONTINUITY; } DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); int64_t timeUs; mLatestDequeuedMeta = (*buffer)->meta()->dup(); CHECK(mLatestDequeuedMeta->findInt64("timeUs" , &timeUs)); if (timeUs > seg.mMaxDequeTimeUs) { seg.mMaxDequeTimeUs = timeUs; } sp<RefBase> object; if ((*buffer)->meta()->findObject("format" , &object)) { setFormat(static_cast <MetaData*>(object.get())); } return OK; } return mEOSResult; }
2.4、解码后音视频播放过程 关于解码后播放过程请参考:Android Video System(5):Android Multimedia - NuPlayer音视频同步实现分析
(三)、基于NuPlayer的RTSP流媒体协议 3.1.1、RTSP 概述: RTSP 是Real Time Streaming Protocol(实时流媒体协议)的简称。RTSP提供一种可扩展的框架,使得能够提供可控制的,按需传输实时数据,比如音频和视频文件。RTSP对流媒体提供了诸如暂停,快进等控制,而它本身并不传输数据,RTSP作用相当于流媒体服务器的远程控制。传输数据可以通过传输层的TCP,UDP协议,RTSP也提供了基于 RTP传输机制的一些有效的方法。
3.1.2、RTSP 模型: 客户机在向视频服务器请求视频服务之前,首先通过HTTP协议从WEB服务器获取所请求视频服务的演示描述(Presentation description)文件,在RTSP中,每个演示(Presentation)及其所对应的媒体流都由一个RTSP URL标识。整个演示及媒体特性都在一个演示描述(Presentation description)文件中定义,该文件可能包括媒体编码方式、语言、RTSPURLs、目标地址、端口及其它参数。用户在向服务器请求某个连续媒体流的服务之前,必须首先从服务器获得该媒体流的演示描述(Presentation description )文件以得到必需的参数。利用该文件提供的信息定位视频服务地址(包括视频服务器地址和端口号)及视频服务的编码方式等信息。 客户机根据上述信息向视频服务器请求视频服务。视频服务初始化完毕,视频服务器为该客户建立一个新的视频服务流,客户端与服务器运行实时流控制协议RTSP,以对该流进行各种VCR 控制信号的交换,如播放、暂停、快进、快退等。当服务完毕,客户端提出拆线(TEARDOWN)请求。服务器使用 RTP协议将媒体数据传输给客户端,一旦数据抵达客户端,客户端应用程序即可播放输出。在流式传输中,使用RTP/RTCP和RTSP /TCP两种不同的通信协议在客户端和服务器间建立联系。如下图:
3.1.3、RTSP 协议消息格式: 请求消息格式:
1 2 3 方法 URI RTSP版本 CR LF 消息头 CR LF CR LF 消息体 CR LF
其中方法包括OPTION回应中所有的命令,URI是接受方的地址,例如 rtsp://
RTSP版本一般都是 RTSP/1.0.每行后面的CR LF表示回车换行,需要接受端有相应的解析,最后一个消息头需要有两个CR LF
1 2 3 RTSP版本 状态码 解释 CR LF 消息头 CR LF CR LF 消息体 CR LF
3.1.4、简单的RTSP 交互过程: 下面以一次流媒体播放为例介绍整个播放过程的RTSP状态转换的流程: 其中C表示RTSP客户端,S表示RTSP服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 C->S:OPTION request S->C:OPTION response C->S:DESCRIBE request S->C:DESCRIBE response C->S:SETUP request S->C:SETUP response C->S:PLAY request S->C:PLAY response S->C: C->S:TEARDOWN request S->C:TEARDOWN response
其中第SETUP和PLAY这两部是必需的, OPTION 步骤只要服务器客户端约定好,有哪些方法可用,则option请求可以不要。 如果我们有其他途径得到媒体初始化描述信息,则我们也不需要通过RTSP中的DESCRIPTION请求来完成。 TEARDOWN,可以根据系统需求的设计来决定是否需要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 Status-Code = | "100" ; Continue | "200" ; OK | "201" ; Created | "250" ; Low on Storage Space | "300" ; Multiple Choices | "301" ; Moved Permanently | "302" ; Moved Temporarily | "303" ; See Other | "304" ; Not Modified | "305" ; Use Proxy | "400" ; Bad Request | "401" ; Unauthorized | "402" ; Payment Required | "403" ; Forbidden | "404" ; Not Found | "405" ; Method Not Allowed | "406" ; Not Acceptable | "407" ; Proxy Authentication Required | "408" ; Request Time-out | "410" ; Gone | "411" ; Length Required | "412" ; Precondition Failed | "413" ; Request Entity Too Large | "414" ; Request-URI Too Large | "415" ; Unsupported Media Type | "451" ; Parameter Not Understood | "452" ; Conference Not Found | "453" ; Not Enough Bandwidth | "454" ; Session Not Found | "455" ; Method Not Valid in This State | "456" ; Header Field Not Valid for Resource | "457" ; Invalid Range | "458" ; Parameter Is Read-Only | "459" ; Aggregate operation not allowed | "460" ; Only aggregate operation allowed | "461" ; Unsupported transport | "462" ; Destination unreachable | "500" ; Internal Server Error | "501" ; Not Implemented | "502" ; Bad Gateway | "503" ; Service Unavailable | "504" ; Gateway Time-out | "505" ; RTSP Version not supported | "551" ; Option not supported
3.1.6、SDP的格式: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 v=<version> (协议版本) o=<username> <session id> <version> <network type> <address type> <address> (所有者/创建者和会话标识符) s=<session name> (会话名称) i=<session description> (会话信息) u=<URI> (URI 描述) e=<email address> (Email 地址) p=<phone number> (电话号码) c=<network type> <address type> <connection address> (连接信息) b=<modifier>:<bandwidth-value> (带宽信息) t=<start time> <stop time> (会话活动时间) r=<repeat interval> <active duration> <list of offsets from start-time> (0 或多次重复次数) z=<adjustment time> <offset> <adjustment time> <offset>(时间区域调整) k=<method>:<encryption key> (加密密钥) a=<attribute>:<value> (0 个或多个会话属性行) m=<media> <port> <transport> <fmt list > (媒体名称和传输地址) 时间描述: t = (会话活动时间) r = * (0 或多次重复次数) 媒体描述: m = (媒体名称和传输地址) i = * (媒体标题) c = * (连接信息 — 如果包含在会话层则该字段可选) b = * (带宽信息) k = * (加密密钥) a = * (0 个或多个媒体属性行)
3.1.7、RTP协议: 实时传输协议(Real-time Transport Protocol,RTP)是用来在单播或者多播的情境中传流媒体数据的数据传输协议。通常使用UDP来进行多媒体数据的传输,也不排除使用TCP或者ATM等其它协议作为它的载体,整个RTP 协议由两个密切相关的部分组成:RTP数据协议和RTP控制协议(也就是RTCP协议)。 RTP为Internet上端到端的实时传输提供时间信息和流同步,但它并不保证服务质量,服务质量由RTCP来提供。
使用RTP协议进行数据传输的一个简要RTP的会话过程: 当应用程序建立一个RTP会话时,应用程序将确定一对目的传输地址。目的传输地址由一个网络地址和一对端口组成,有两个端口:一个给RTP包,一个给RTCP包,也就是说RTP和RTCP数据包是分开传输的,这样可以使得RTP/RTCP数据能够正确发送。其中RTP数据发向偶数的UDP端口,而对应的控制信号RTCP数据发向相邻的奇数UDP端口,这样就构成一个UDP端口对。 当发送数据的时候RTP协议从上层接收流媒体信息码流,封装成RTP数据包;RTCP从上层接收控制信息,封装成RTCP控制包。RTP将RTP 数据包发往UDP端口对中偶数端口;RTCP将RTCP控制包发往UDP端口对中的接收端口。 如果在一次会议中同时使用了音频和视频会议,这两种媒体将分别在不同的RTP会话中传送,每一个会话使用不同的传输地址(IP地址+端口)。如果一个用户同时使用了两个会话,则每个会话对应的RTCP包都使用规范化名字CNAME(Canonical Name)。与会者可以根据RTCP包中的CNAME来获取相关联的音频和视频,然后根据RTCP包中的计时信息(Network time protocol)来实现音频和视频的同步。
翻译器和混合器 在RTP协议中还引入了翻译器和混合器。翻译器和混合器都是RTP级的中继系统。 混合器的使用情景: 在Internet上举行视频会议时,可能有少数参加者通过低速链路与使用高速网络的多数参加者相连接。为了不强制所有会议参加者都使用低带宽和低质量的数据编码,RTP允许在低带宽区域附近使用混合器作为RTP级中继器。混合器从一个或多个信源接收RTP报文,对到达的数据报文进行重新同步和重新组合,这些重组的数据流被混合成一个数据流,将数据编码转化为在低带宽上可用的类型,并通过低速链路向低带宽区域转发。为了对多个输入信源进行统一的同步,混合器在多个媒体流之间进行定时调整,产生它自己的定时同步,因此所有从混合器输出的报文都把混合器作为同步信源。为了保证接收者能够正确识别混合器处理前的原始报文发送者,混合器在RTP报头中设置了CSRC标识符队列,以标识那些产生混和报文的原始同步信源。 翻译器的使用情景 在Internet环境中,一些会议的参加者可能被隔离在应用级防火墙的外面,这些参加者被禁止直接使用IP组播地址进行访问,虽然他们可能是通过高速链路连接的。在这些情况下,RTP允许使用转换器作为RTP级中继器。在防火墙两端分别安装一个转换器,防火墙之外的转换器过滤所有接收到的组播报文,并通过一条安全的连接传送给防火墙之内的转换器,内部转换器将这些组播报文再转发送给内部网络中的组播组成员
3.1.9、RTCP协议报头格式 如前面所述RTCP的主要功能是:服务质量的监视与反馈、媒体间的同步,以及多播组中成员的标识。在RTP会话期间,各参与者周期性地传送RTCP包。RTCP包中含有已发送的数据包的数量、丢失的数据包的数量等统计资料,因此,各参与者可以利用这些信息动态地改变传输速率,甚至改变有效载荷类型。RTP和RTCP配合使用,它们能以有效的反馈和最小的开销使传输效率最佳化,因而特别适合传送网上的实时数据。RTCP也是用UDP来传送的,但RTCP封装的仅仅是一些控制信息,因而分组很短,所以可以将多个RTCP分组封装在一个UDP包中。 RTCP有如下五种分组类型:
3.2、基于NuPLayer的RTSP 代码流程 setDataSource 阶段的任务这里就不重复介绍了,它主要完成播放引擎的建立以及根据URL格式创建对应的Source,比如这里将要提到的RTSPSource,然后赋值给mSource。
3.3、RTSPSource::prepareAsync()流程 在prepare阶段我们首先会判断是否是SDP,mIsSDP这个变量是在初始化RTSPSource时候传入的,我们这里先分析mIsSDP = false的情况。这种情况下首先创建一个MyHandler,并调用connect,与服务器建立连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [->\frameworks\av\media\libmediaplayerservice\nuplayer\RTSPSource.cpp] void NuPlayer::RTSPSource::prepareAsync() { sp<AMessage> notify = new AMessage(kWhatNotify, this ); CHECK_EQ(mState, (int )DISCONNECTED); mState = CONNECTING; if (mIsSDP) { mSDPLoader = new SDPLoader(notify, (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0 , mHTTPService); mSDPLoader->load(mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders); } else { mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID); mLooper->registerHandler(mHandler); mHandler->connect(); } startBufferingIfNecessary(); }
在介绍connect方法之前需要先了解mConn以及mRTPConn这两个成员变量,mConn是一个ARTSPConnection,它主要与服务器相连,发送和接收请求数据,mRTPConn是一个ARTPConnection 用于发送和接收媒体数据。 在connect方法中会使用mConn向服务器发起连接请求。
1 2 3 4 5 6 7 8 9 10 11 12 [->\frameworks\av\media\libstagefright\rtsp\MyHandler.h] void connect () { looper()->registerHandler(mConn); (1 ? mNetLooper : looper())->registerHandler(mRTPConn); sp<AMessage> notify = new AMessage('biny', this); mConn->observeBinaryData(notify); sp<AMessage> reply = new AMessage('conn', this); mConn->connect(mOriginalSessionURL.c_str(), reply); }
1 2 3 4 5 6 7 8 9 10 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] void ARTSPConnection::connect (const char *url, const sp<AMessage> &reply) { sp<AMessage> msg = new AMessage(kWhatConnect, this ); msg->setString("url" , url); msg->setMessage("reply" , reply); msg->post(); } case kWhatConnect: onConnect(msg); break ;
在ARTSPConnection::onConnect中将会从传递过来的URl中解析host,port,path,mUser,mPass,并调用::connect 和服务器取得联系,最后调用postReceiveReponseEvent将请求的回复响应暂存起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] void ARTSPConnection::onConnect (const sp<AMessage> &msg) { ++mConnectionID; if (mState != DISCONNECTED) { if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(mSocket); HTTPBase::UnRegisterSocketUserMark(mSocket); } close(mSocket); mSocket = -1 ; flushPendingRequests(); } mState = CONNECTING; AString url; CHECK(msg->findString("url" , &url)); sp<AMessage> reply; CHECK(msg->findMessage("reply" , &reply)); AString host, path; unsigned port; if (!ParseURL(url.c_str(), &host, &port, &path, &mUser, &mPass) || (mUser.size() > 0 && mPass.size() == 0 )) { ALOGE("Malformed rtsp url %s" , uriDebugString(url).c_str()); reply->setInt32("result" , ERROR_MALFORMED); reply->post(); mState = DISCONNECTED; return ; } if (mUser.size() > 0 ) { ALOGV("user = '%s', pass = '%s'" , mUser.c_str(), mPass.c_str()); } struct hostent *ent = gethostbyname (host .c_str ()); if (ent == NULL ) { ALOGE("Unknown host %s" , host.c_str()); reply->setInt32("result" , -ENOENT); reply->post(); mState = DISCONNECTED; return ; } mSocket = socket(AF_INET, SOCK_STREAM, 0 ); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(mSocket, mUID,(uint32_t )*(uint32_t *) "RTSP" ); HTTPBase::RegisterSocketUserMark(mSocket, mUID); } MakeSocketBlocking(mSocket, false ); struct sockaddr_in remote ; memset (remote.sin_zero, 0 , sizeof (remote.sin_zero)); remote.sin_family = AF_INET; remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; remote.sin_port = htons(port); int err = ::connect(mSocket, (const struct sockaddr *)&remote, sizeof (remote)); reply->setInt32("server-ip" , ntohl(remote.sin_addr.s_addr)); if (err < 0 ) { if (errno == EINPROGRESS) { sp<AMessage> msg = new AMessage(kWhatCompleteConnection, this ); msg->setMessage("reply" , reply); msg->setInt32("connection-id" , mConnectionID); msg->post(); return ; } reply->setInt32("result" , -errno); mState = DISCONNECTED; if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(mSocket); HTTPBase::UnRegisterSocketUserMark(mSocket); } close(mSocket); mSocket = -1 ; } else { reply->setInt32("result" , OK); mState = CONNECTED; mNextCSeq = 1 ; postReceiveReponseEvent(); } reply->post(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 [->] void ARTSPConnection::postReceiveReponseEvent () { if (mReceiveResponseEventPending) { return ; } sp<AMessage> msg = new AMessage(kWhatReceiveResponse, this ); msg->post(); mReceiveResponseEventPending = true ; } void ARTSPConnection::onReceiveResponse () { mReceiveResponseEventPending = false ; if (mState != CONNECTED) { return ; } struct timeval tv ; tv.tv_sec = 0 ; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); FD_SET(mSocket, &rs); int res = select(mSocket + 1 , &rs, NULL , NULL , &tv); if (res == 1 ) { MakeSocketBlocking(mSocket, true ); bool success = receiveRTSPReponse(); MakeSocketBlocking(mSocket, false ); if (!success) { flushPendingRequests(); return ; } } postReceiveReponseEvent(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] bool ARTSPConnection::receiveRTSPReponse () { AString statusLine; if (!receiveLine(&statusLine)) { return false ; } if (statusLine == "$" ) { sp<ABuffer> buffer = receiveBinaryData(); if (buffer == NULL ) { return false ; } if (mObserveBinaryMessage != NULL ) { sp<AMessage> notify = mObserveBinaryMessage->dup(); notify->setBuffer("buffer" , buffer); notify->post(); } else { ALOGW("received binary data, but no one cares." ); } return true ; } sp<ARTSPResponse> response = new ARTSPResponse; response->mStatusLine = statusLine; ALOGI("status: %s" , response->mStatusLine.c_str()); ssize_t space1 = response->mStatusLine.find(" " ); if (space1 < 0 ) { return false ; } ssize_t space2 = response->mStatusLine.find(" " , space1 + 1 ); if (space2 < 0 ) { return false ; } bool isRequest = false ; if (!IsRTSPVersion(AString(response->mStatusLine, 0 , space1))) { CHECK(IsRTSPVersion(AString(response->mStatusLine,space2 + 1 ,response->mStatusLine.size() - space2 - 1 ))); isRequest = true ; response->mStatusCode = 0 ; } else { AString statusCodeStr(response->mStatusLine, space1 + 1 , space2 - space1 - 1 ); if (!ParseSingleUnsignedLong(statusCodeStr.c_str(), &response->mStatusCode) || response->mStatusCode < 100 || response->mStatusCode > 999 ) { return false ; } } AString line; ssize_t lastDictIndex = -1 ; for (;;) { if (!receiveLine(&line)) { break ; } if (line.empty()) { break ; } ALOGV("line: '%s'" , line.c_str()); if (line.c_str()[0 ] == ' ' || line.c_str()[0 ] == '\t' ) { if (lastDictIndex < 0 ) { return false ; } AString &value = response->mHeaders.editValueAt(lastDictIndex); value.append(line); continue ; } ssize_t colonPos = line.find(":" ); if (colonPos < 0 ) { return false ; } AString key (line, 0 , colonPos) ; key.trim(); key.tolower (); line.erase(0 , colonPos + 1 ); lastDictIndex = response->mHeaders.add(key, line); } for (size_t i = 0 ; i < response->mHeaders.size(); ++i) { response->mHeaders.editValueAt(i).trim(); } unsigned long contentLength = 0 ; ssize_t i = response->mHeaders.indexOfKey("content-length" ); if (i >= 0 ) { AString value = response->mHeaders.valueAt(i); if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { return false ; } } if (contentLength > 0 ) { response->mContent = new ABuffer(contentLength); if (receive(response->mContent->data(), contentLength) != OK) { return false ; } } return isRequest ? handleServerRequest(response) : notifyResponseListener(response); }
isRequest 表示是服务器主动发送的请求,那么将调用handleServerRequest,否则表示是服务器被动响应客户端的请求,那么将通知服务器有响应了notifyResponseListener,我们这里先看下这两个方法的实现:
看到handleServerRequest大家可能会有点失望,因为这里尚未实现这个功能所以只是向服务器返回一个“RTSP/1.0 501 Not Implemented”的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] bool ARTSPConnection::handleServerRequest (const sp<ARTSPResponse> &request) { ssize_t space1 = request->mStatusLine.find(" " ); CHECK_GE(space1, 0 ); AString response; response.append("RTSP/1.0 501 Not Implemented\r\n" ); ssize_t i = request->mHeaders.indexOfKey("cseq" ); if (i >= 0 ) { AString value = request->mHeaders.valueAt(i); unsigned long cseq; if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { return false ; } response.append("CSeq: " ); response.append(cseq); response.append("\r\n" ); } response.append("\r\n" ); size_t numBytesSent = 0 ; while (numBytesSent < response.size()) { ssize_t n = send(mSocket, response.c_str() + numBytesSent, response.size() - numBytesSent, 0 ); if (n < 0 && errno == EINTR) { continue ; } if (n <= 0 ) { if (n == 0 ) { ALOGE("Server unexpectedly closed the connection." ); } else { ALOGE("Error sending rtsp response (%s)." , strerror(errno)); } performDisconnect(); return false ; } numBytesSent += (size_t )n; } return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] bool ARTSPConnection::notifyResponseListener ( const sp<ARTSPResponse> &response) { ssize_t i; status_t err = findPendingRequest(response, &i); if (err != OK) { return false ; } sp<AMessage> reply = mPendingRequests.valueAt(i); mPendingRequests.removeItemsAt(i); reply->setInt32("result" , OK); reply->setObject("response" , response); reply->post(); return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 [->\frameworks\av\media\libstagefright\rtsp\MyHandler.h] case 'conn': { int32_t result; CHECK(msg->findInt32("result" , &result)); if (result == OK) { AString request; request = "DESCRIBE " ; request.append(mSessionURL); request.append(" RTSP/1.0\r\n" ); request.append("Accept: application/sdp\r\n" ); request.append("\r\n" ); sp<AMessage> reply = new AMessage('desc', this); mConn->sendRequest(request.c_str(), reply); } else { (new AMessage('disc', this))->post(); } break ; }
这里比较简单就是收到答复之后,直接判断结果是OK还是不OK,如果OK那么就发送一个DESCRIBE的请求。我们重点看下,onSendRequest理解这个很重要: 在onSendRequest中会对请求加工处理下,比如添加Cseq等操作,然后就会调用send向服务器发送请求。并将请求以Cseq为键码,replay为回复消息的待处理请求队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [->\frameworks\av\media\libstagefright\rtsp\ARTSPConnection.cpp] void ARTSPConnection::onSendRequest (const sp<AMessage> &msg) { sp<AMessage> reply; CHECK(msg->findMessage("reply" , &reply)); AString request; CHECK(msg->findString("request" , &request)); reply->setString("original-request" , request.c_str(), request.size()); addAuthentication(&request); addUserAgent(&request); ssize_t i = request.find("\r\n\r\n" ); CHECK_GE(i, 0 ); int32_t cseq = mNextCSeq++; AString cseqHeader = "CSeq: " ; cseqHeader.append(cseq); cseqHeader.append("\r\n" ); request.insert(cseqHeader, i + 2 ); ALOGV("request: '%s'" , request.c_str()); size_t numBytesSent = 0 ; while (numBytesSent < request.size()) { ssize_t n = send(mSocket, request.c_str() + numBytesSent,request.size() - numBytesSent, 0 );} numBytesSent += (size_t )n; } mPendingRequests.add(cseq, reply); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 status_t ARTSPConnection::findPendingRequest ( const sp<ARTSPResponse> &response, ssize_t *index) const { *index = 0 ; ssize_t i = response->mHeaders.indexOfKey("cseq" ); if (i < 0 ) { *index = -1 ; return OK; } AString value = response->mHeaders.valueAt(i); unsigned long cseq; if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { return ERROR_MALFORMED; } i = mPendingRequests.indexOfKey(cseq); if (i < 0 ) { return -ENOENT; } *index = i; return OK; }
onSendRequest 会不断将请求放入mPendingRequests中,而每次服务器给出应答的时候会调用notifyResponseListener,notifyResponseListener会从mPendingRequests中取出一个应答消息,并发送消息给MyHandler进行处理,而notifyResponseListener又会阻塞等待下一个服务器的应答信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 case 'desc': { int32_t result; CHECK(msg->findInt32("result" , &result)); if (result == OK) { sp<RefBase> obj; CHECK(msg->findObject("response" , &obj)); sp<ARTSPResponse> response = static_cast <ARTSPResponse *>(obj.get()); if (response->mStatusCode == 301 || response->mStatusCode == 302 ) { } if (response->mStatusCode != 200 ) { result = UNKNOWN_ERROR; } else if (response->mContent == NULL ) { result = ERROR_MALFORMED; ALOGE("The response has no content." ); } else { mSessionDesc = new ASessionDescription; mSessionDesc->setTo(response->mContent->data(),response->mContent->size()); if (!mSessionDesc->isValid()) { } else { if (mSessionDesc->countTracks() < 2 ) { ALOGW("Session doesn't contain any playable " "tracks. Aborting." ); result = ERROR_UNSUPPORTED; } else { setupTrack(1 ); } } } } break ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 void setupTrack (size_t index) { sp<APacketSource> source = new APacketSource(mSessionDesc, index); AString url; CHECK(mSessionDesc->findAttribute(index, "a=control" , &url)); AString trackURL; CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1 ); info->mURL = trackURL; info->mPacketSource = source; info->mUsingInterleavedTCP = false ; info->mFirstSeqNumInSegment = 0 ; info->mNewSegment = true ; info->mRTPSocket = -1 ; info->mRTCPSocket = -1 ; info->mRTPAnchor = 0 ; info->mNTPAnchorUs = -1 ; info->mNormalPlayTimeRTP = 0 ; info->mNormalPlayTimeUs = 0l l; unsigned long PT; AString formatDesc; AString formatParams; mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); int32_t timescale; int32_t numChannels; ASessionDescription::ParseFormatDesc(formatDesc.c_str(), ×cale, &numChannels); info->mTimeScale = timescale; info->mEOSReceived = false ; ALOGV("track #%zu URL=%s" , mTracks.size(), trackURL.c_str()); AString request = "SETUP " ; request.append(trackURL); request.append(" RTSP/1.0\r\n" ); if (mTryTCPInterleaving) { size_t interleaveIndex = 2 * (mTracks.size() - 1 ); info->mUsingInterleavedTCP = true ; info->mRTPSocket = interleaveIndex; info->mRTCPSocket = interleaveIndex + 1 ; request.append("Transport: RTP/AVP/TCP;interleaved=" ); request.append(interleaveIndex); request.append("-" ); request.append(interleaveIndex + 1 ); } else { unsigned rtpPort; ARTPConnection::MakePortPair( &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, (uint32_t )*(uint32_t *) "RTP_" ); HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, (uint32_t )*(uint32_t *) "RTP_" ); HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); } request.append("Transport: RTP/AVP/UDP;unicast;client_port=" ); request.append(rtpPort); request.append("-" ); request.append(rtpPort + 1 ); } request.append("\r\n" ); if (index > 1 ) { request.append("Session: " ); request.append(mSessionID); request.append("\r\n" ); } request.append("\r\n" ); sp<AMessage> reply = new AMessage('setu', this); reply->setSize("index" , index); reply->setSize("track-index" , mTracks.size() - 1 ); mConn->sendRequest(request.c_str(), reply); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 case 'setu': { size_t index; CHECK(msg->findSize("index" , &index)); TrackInfo *track = NULL ; size_t trackIndex; if (msg->findSize("track-index" , &trackIndex)) { track = &mTracks.editItemAt(trackIndex); } int32_t result; CHECK(msg->findInt32("result" , &result)); if (result == OK) { CHECK(track != NULL ); sp<RefBase> obj; CHECK(msg->findObject("response" , &obj)); sp<ARTSPResponse> response = static_cast <ARTSPResponse *>(obj.get()); if (response->mStatusCode != 200 ) { } else { ssize_t i = response->mHeaders.indexOfKey("session" ); CHECK_GE(i, 0 ); mSessionID = response->mHeaders.valueAt(i); mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; AString timeoutStr; sp<AMessage> notify = new AMessage('accu', this); notify->setSize("track-index" , trackIndex); i = response->mHeaders.indexOfKey("transport" ); CHECK_GE(i, 0 ); if (track->mRTPSocket != -1 && track->mRTCPSocket != -1 ) { if (!track->mUsingInterleavedTCP) { AString transport = response->mHeaders.valueAt(i); pokeAHole( track->mRTPSocket, track->mRTCPSocket, transport); } mRTPConn->addStream( track->mRTPSocket, track->mRTCPSocket, mSessionDesc, index, notify, track->mUsingInterleavedTCP); mSetupTracksSuccessful = true ; } else { result = BAD_VALUE; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 void ARTPConnection::addStream ( int rtpSocket, int rtcpSocket, const sp<ASessionDescription> &sessionDesc, size_t index, const sp<AMessage> ¬ify, bool injected) { sp<AMessage> msg = new AMessage(kWhatAddStream, this ); msg->setInt32("rtp-socket" , rtpSocket); msg->setInt32("rtcp-socket" , rtcpSocket); msg->setObject("session-desc" , sessionDesc); msg->setSize("index" , index); msg->setMessage("notify" , notify); msg->setInt32("injected" , injected); msg->post(); } case kWhatAddStream:{ onAddStream(msg); break ; } void ARTPConnection::onAddStream (const sp<AMessage> &msg) { mStreams.push_back(StreamInfo()); StreamInfo *info = &*--mStreams.end(); int32_t s; CHECK(msg->findInt32("rtp-socket" , &s)); info->mRTPSocket = s; CHECK(msg->findInt32("rtcp-socket" , &s)); info->mRTCPSocket = s; int32_t injected; CHECK(msg->findInt32("injected" , &injected)); info->mIsInjected = injected; sp<RefBase> obj; CHECK(msg->findObject("session-desc" , &obj)); info->mSessionDesc = static_cast <ASessionDescription *>(obj.get()); CHECK(msg->findSize("index" , &info->mIndex)); CHECK(msg->findMessage("notify" , &info->mNotifyMsg)); info->mNumRTCPPacketsReceived = 0 ; info->mNumRTPPacketsReceived = 0 ; memset (&info->mRemoteRTCPAddr, 0 , sizeof (info->mRemoteRTCPAddr)); if (!injected) { postPollEvent(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 void ARTPConnection::postPollEvent () { if (mPollEventPending) { return ; } sp<AMessage> msg = new AMessage(kWhatPollStreams, this ); msg->post(); mPollEventPending = true ; } case kWhatPollStreams:{ onPollStreams(); break ; } void ARTPConnection::onPollStreams () { mPollEventPending = false ; if (mStreams.empty()) { return ; } struct timeval tv ; tv.tv_sec = 0 ; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); int maxSocket = -1 ; for (List<StreamInfo>::iterator it = mStreams.begin(); it != mStreams.end(); ++it) { if ((*it).mIsInjected) { continue ; } FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTCPSocket, &rs); if (it->mRTPSocket > maxSocket) { maxSocket = it->mRTPSocket; } if (it->mRTCPSocket > maxSocket) { maxSocket = it->mRTCPSocket; } } if (maxSocket == -1 ) { return ; } int res = select(maxSocket + 1 , &rs, NULL , NULL , &tv); if (res > 0 ) { List<StreamInfo>::iterator it = mStreams.begin(); while (it != mStreams.end()) { if ((*it).mIsInjected) { ++it; continue ; } status_t err = OK; if (FD_ISSET(it->mRTPSocket, &rs)) { err = receive(&*it, true ); } if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { err = receive(&*it, false ); } ++it; } } int64_t nowUs = ALooper::GetNowUs(); if (mLastReceiverReportTimeUs <= 0 || mLastReceiverReportTimeUs + 5000000l l <= nowUs) { sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); List<StreamInfo>::iterator it = mStreams.begin(); while (it != mStreams.end()) { StreamInfo *s = &*it; if (s->mIsInjected) { ++it; continue ; } if (s->mNumRTCPPacketsReceived == 0 ) { ++it; continue ; } buffer->setRange(0 , 0 ); for (size_t i = 0 ; i < s->mSources.size(); ++i) { sp<ARTPSource> source = s->mSources.valueAt(i); source->addReceiverReport(buffer); if (mFlags & kRegularlyRequestFIR) { source->addFIR(buffer); } } if (buffer->size() > 0 ) { ALOGV("Sending RR..." ); ssize_t n; do { n = sendto(s->mRTCPSocket, buffer->data(), buffer->size(), 0 ,(const struct sockaddr *)&s->mRemoteRTCPAddr, sizeof (s->mRemoteRTCPAddr)); } while (n < 0 && errno == EINTR); CHECK_EQ(n, (ssize_t )buffer->size()); mLastReceiverReportTimeUs = nowUs; } ++it; } } if (!mStreams.empty()) { postPollEvent(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 status_t ARTPConnection::receive (StreamInfo *s, bool receiveRTP) { ALOGV("receiving %s" , receiveRTP ? "RTP" : "RTCP" ); CHECK(!s->mIsInjected); sp<ABuffer> buffer = new ABuffer(65536 ); socklen_t remoteAddrLen = (!receiveRTP && s->mNumRTCPPacketsReceived == 0 ) ? sizeof (s->mRemoteRTCPAddr) : 0 ; ssize_t nbytes; do { nbytes = recvfrom( receiveRTP ? s->mRTPSocket : s->mRTCPSocket, buffer->data(), buffer->capacity(), 0 , remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL , remoteAddrLen > 0 ? &remoteAddrLen : NULL ); } while (nbytes < 0 && errno == EINTR); if (nbytes <= 0 ) { return -ECONNRESET; } buffer->setRange(0 , nbytes); status_t err; if (receiveRTP) { err = parseRTP(s, buffer); } else { err = parseRTCP(s, buffer); } return err; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 status_t ARTPConnection::parseRTP (StreamInfo *s, const sp<ABuffer> &buffer) { const uint8_t *data = buffer->data(); if ((data[0 ] >> 6 ) != 2 ) { return -1 ; } if (data[0 ] & 0x20 ) { size_t paddingLength = data[size - 1 ]; if (paddingLength + 12 > size) { return -1 ; } size -= paddingLength; } int numCSRCs = data[0 ] & 0x0f ; size_t payloadOffset = 12 + 4 * numCSRCs; if (size < payloadOffset) { return -1 ; } if (data[0 ] & 0x10 ) { if (size < payloadOffset + 4 ) { return -1 ; } const uint8_t *extensionData = &data[payloadOffset]; size_t extensionLength = 4 * (extensionData[2 ] << 8 | extensionData[3 ]); if (size < payloadOffset + 4 + extensionLength) { return -1 ; } payloadOffset += 4 + extensionLength; } uint32_t srcId = u32at(&data[8 ]); sp<ARTPSource> source = findSource(s, srcId); uint32_t rtpTime = u32at(&data[4 ]); sp<AMessage> meta = buffer->meta(); meta->setInt32("ssrc" , srcId); meta->setInt32("rtp-time" , rtpTime); meta->setInt32("PT" , data[1 ] & 0x7f ); meta->setInt32("M" , data[1 ] >> 7 ); buffer->setInt32Data(u16at(&data[2 ])); buffer->setRange(payloadOffset, size - payloadOffset); source->processRTPPacket(buffer); return OK; }
1 2 3 4 5 6 [->\frameworks\av\media\libstagefright\rtsp\ARTPSource.cpp] void ARTPSource::processRTPPacket (const sp<ABuffer> &buffer) { if (queuePacket(buffer) && mAssembler != NULL ) { mAssembler->onPacketReceived(this ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 [->\frameworks\av\media\libstagefright\rtsp\ARTPAssembler.cpp] void ARTPAssembler::onPacketReceived (const sp<ARTPSource> &source) { AssemblyStatus status; for (;;) { status = assembleMore(source); if (status == WRONG_SEQUENCE_NUMBER) { if (mFirstFailureTimeUs >= 0 ) { if (ALooper::GetNowUs() - mFirstFailureTimeUs > 10000l l) { mFirstFailureTimeUs = -1 ; packetLost(); continue ; } } else { mFirstFailureTimeUs = ALooper::GetNowUs(); } break ; } else { mFirstFailureTimeUs = -1 ; if (status == NOT_ENOUGH_DATA) { break ; } } } } ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::assembleMore ( const sp<ARTPSource> &source) { AssemblyStatus status = addPacket(source); if (status == MALFORMED_PACKET) { mAccessUnitDamaged = true ; } return status; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::addPacket ( const sp<ARTPSource> &source) { List<sp<ABuffer> > *queue = source->queue (); if (queue ->empty()) { return NOT_ENOUGH_DATA; } if (mNextExpectedSeqNoValid) { List<sp<ABuffer> >::iterator it = queue ->begin(); while (it != queue ->end()) { if ((uint32_t )(*it)->int32Data() >= mNextExpectedSeqNo) { break ; } it = queue ->erase(it); } if (queue ->empty()) { return NOT_ENOUGH_DATA; } } sp<ABuffer> buffer = *queue ->begin(); if (!mNextExpectedSeqNoValid) { mNextExpectedSeqNoValid = true ; mNextExpectedSeqNo = (uint32_t )buffer->int32Data(); } else if ((uint32_t )buffer->int32Data() != mNextExpectedSeqNo) { #if VERBOSE LOG(VERBOSE) << "Not the sequence number I expected" ; #endif return WRONG_SEQUENCE_NUMBER; } uint32_t rtpTime; CHECK(buffer->meta()->findInt32("rtp-time" , (int32_t *)&rtpTime)); if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) { submitAccessUnit(); } mAccessUnitRTPTime = rtpTime; mPackets.push_back(buffer); queue ->erase(queue ->begin()); ++mNextExpectedSeqNo; return OK; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 void AMPEG4AudioAssembler::submitAccessUnit () { CHECK(!mPackets.empty()); #if VERBOSE LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)" ; #endif sp<ABuffer> accessUnit = MakeCompoundFromPackets(mPackets); accessUnit = removeLATMFraming(accessUnit); CopyTimes(accessUnit, *mPackets.begin()); if (mAccessUnitDamaged) { accessUnit->meta()->setInt32("damaged" , true ); } mPackets.clear(); mAccessUnitDamaged = false ; sp<AMessage> msg = mNotifyMsg->dup(); msg->setBuffer("access-unit" , accessUnit); msg->post(); } case 'accu': { int32_t timeUpdate; if (msg->findInt32("time-update" , &timeUpdate) && timeUpdate) { size_t trackIndex; CHECK(msg->findSize("track-index" , &trackIndex)); uint32_t rtpTime; uint64_t ntpTime; CHECK(msg->findInt32("rtp-time" , (int32_t *)&rtpTime)); CHECK(msg->findInt64("ntp-time" , (int64_t *)&ntpTime)); onTimeUpdate(trackIndex, rtpTime, ntpTime); break ; } int32_t first; if (msg->findInt32("first-rtcp" , &first)) { mReceivedFirstRTCPPacket = true ; break ; } if (msg->findInt32("first-rtp" , &first)) { mReceivedFirstRTPPacket = true ; break ; } ++mNumAccessUnitsReceived; postAccessUnitTimeoutCheck(); size_t trackIndex; CHECK(msg->findSize("track-index" , &trackIndex)); if (trackIndex >= mTracks.size()) { ALOGV("late packets ignored." ); break ; } TrackInfo *track = &mTracks.editItemAt(trackIndex); int32_t eos; if (msg->findInt32("eos" , &eos)) { ALOGI("received BYE on track index %zu" , trackIndex); if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { ALOGI("No time established => fake existing data" ); track->mEOSReceived = true ; mTryFakeRTCP = true ; mReceivedFirstRTCPPacket = true ; fakeTimestamps(); } else { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); } return ; } sp<ABuffer> accessUnit; CHECK(msg->findBuffer("access-unit" , &accessUnit)); uint32_t seqNum = (uint32_t )accessUnit->int32Data(); if (mSeekPending) { ALOGV("we're seeking, dropping stale packet." ); break ; } if (seqNum < track->mFirstSeqNumInSegment) { ALOGV("dropping stale access-unit (%d < %d)" , seqNum, track->mFirstSeqNumInSegment); break ; } if (track->mNewSegment) { track->mNewSegment = false ; } onAccessUnitComplete(trackIndex, accessUnit); break ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 void onAccessUnitComplete ( int32_t trackIndex, const sp<ABuffer> &accessUnit) { ALOGV("onAccessUnitComplete track %d" , trackIndex); if (!mPlayResponseParsed){ ALOGI("play response is not parsed, storing accessunit" ); TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mPackets.push_back(accessUnit); return ; } handleFirstAccessUnit(); TrackInfo *track = &mTracks.editItemAt(trackIndex); if (!mAllTracksHaveTime) { ALOGV("storing accessUnit, no time established yet" ); track->mPackets.push_back(accessUnit); return ; } while (!track->mPackets.empty()) { sp<ABuffer> accessUnit = *track->mPackets.begin(); track->mPackets.erase(track->mPackets.begin()); if (addMediaTimestamp(trackIndex, track, accessUnit)) { postQueueAccessUnit(trackIndex, accessUnit); } } if (addMediaTimestamp(trackIndex, track, accessUnit)) { postQueueAccessUnit(trackIndex, accessUnit); } if (track->mEOSReceived) { postQueueEOS(trackIndex, ERROR_END_OF_STREAM); track->mEOSReceived = false ; } } void postQueueAccessUnit ( size_t trackIndex, const sp<ABuffer> &accessUnit) { sp<AMessage> msg = mNotify->dup(); msg->setInt32("what" , kWhatAccessUnit); msg->setSize("trackIndex" , trackIndex); msg->setBuffer("accessUnit" , accessUnit); msg->post(); }
在RTSPSource中调用AnotherPacketSource queueAccessUnit(accessUnit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 case MyHandler::kWhatAccessUnit:{ size_t trackIndex; CHECK(msg->findSize("trackIndex" , &trackIndex)); if (mTSParser == NULL ) { CHECK_LT(trackIndex, mTracks.size()); } else { CHECK_EQ(trackIndex, 0u ); } sp<ABuffer> accessUnit; CHECK(msg->findBuffer("accessUnit" , &accessUnit)); int32_t damaged; if (accessUnit->meta()->findInt32("damaged" , &damaged) && damaged) { ALOGI("dropping damaged access unit." ); break ; } if (mTSParser != NULL ) { size_t offset = 0 ; status_t err = OK; while (offset + 188 <= accessUnit->size()) { err = mTSParser->feedTSPacket( accessUnit->data() + offset, 188 ); if (err != OK) { break ; } offset += 188 ; } if (offset < accessUnit->size()) { err = ERROR_MALFORMED; } if (err != OK) { sp<AnotherPacketSource> source = getSource(false ); if (source != NULL ) { source->signalEOS(err); } source = getSource(true ); if (source != NULL ) { source->signalEOS(err); } } break ; } TrackInfo *info = &mTracks.editItemAt(trackIndex); sp<AnotherPacketSource> source = info->mSource; if (source != NULL ) { uint32_t rtpTime; CHECK(accessUnit->meta()->findInt32("rtp-time" , (int32_t *)&rtpTime)); if (!info->mNPTMappingValid) { source->queueAccessUnit(accessUnit); break ; } int64_t nptUs = ((double )rtpTime - (double )info->mRTPTime) / info->mTimeScale * 1000000l l + info->mNormalPlaytimeUs; accessUnit->meta()->setInt64("timeUs" , nptUs); source->queueAccessUnit(accessUnit); } break ; }
queueAccessUnit(accessUnit);将AU数据存放到AnotherPacketSource 的mBuffers中供解码器解码播放:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 void AnotherPacketSource::queueAccessUnit (const sp<ABuffer> &buffer) { int32_t damaged; ...... Mutex::Autolock autoLock (mLock) ; mBuffers.push_back(buffer); mCondition.signal(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity" , &discontinuity)){ ALOGV("queueing a discontinuity with queueAccessUnit" ); mLastQueuedTimeUs = 0l l; mEOSResult = OK; mLatestEnqueuedMeta = NULL ; mDiscontinuitySegments.push_back(DiscontinuitySegment()); return ; } int64_t lastQueuedTimeUs; CHECK(buffer->meta()->findInt64("timeUs" , &lastQueuedTimeUs)); mLastQueuedTimeUs = lastQueuedTimeUs; ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)" , mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6 ); DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end()); if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) { tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs; } if (tailSeg.mMaxDequeTimeUs == -1 ) { tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs; } if (mLatestEnqueuedMeta == NULL ) { mLatestEnqueuedMeta = buffer->meta()->dup(); } else { int64_t latestTimeUs = 0 ; int64_t frameDeltaUs = 0 ; CHECK(mLatestEnqueuedMeta->findInt64("timeUs" , &latestTimeUs)); if (lastQueuedTimeUs > latestTimeUs) { mLatestEnqueuedMeta = buffer->meta()->dup(); frameDeltaUs = lastQueuedTimeUs - latestTimeUs; mLatestEnqueuedMeta->setInt64("durationUs" , frameDeltaUs); } else if (!mLatestEnqueuedMeta->findInt64("durationUs" , &frameDeltaUs)) { frameDeltaUs = latestTimeUs - lastQueuedTimeUs; mLatestEnqueuedMeta->setInt64("durationUs" , frameDeltaUs); } } }
