diff --git a/moxygen/MoQCodec.cpp b/moxygen/MoQCodec.cpp index bcd418ab..aea53afe 100644 --- a/moxygen/MoQCodec.cpp +++ b/moxygen/MoQCodec.cpp @@ -122,7 +122,6 @@ void MoQObjectStreamCodec::onIngress( cursor = newCursor; streamType_ = StreamType(type->first); switch (streamType_) { - case StreamType::STREAM_HEADER_TRACK: case StreamType::STREAM_HEADER_SUBGROUP: parseState_ = ParseState::OBJECT_STREAM; break; @@ -159,7 +158,7 @@ void MoQObjectStreamCodec::onIngress( } case ParseState::OBJECT_STREAM: { auto newCursor = cursor; - auto res = parseStreamHeader(newCursor, streamType_); + auto res = parseSubgroupHeader(newCursor); if (res.hasError()) { XLOG(DBG6) << __func__ << " " << uint32_t(res.error()); connError_ = res.error(); diff --git a/moxygen/MoQFramer.cpp b/moxygen/MoQFramer.cpp index d6ccc329..12645149 100644 --- a/moxygen/MoQFramer.cpp +++ b/moxygen/MoQFramer.cpp @@ -243,16 +243,11 @@ folly::Expected parseObjectHeader( } else { objectHeader.status = ObjectStatus::NORMAL; } - objectHeader.forwardPreference = ForwardPreference::Datagram; return objectHeader; } -folly::Expected parseStreamHeader( - folly::io::Cursor& cursor, - StreamType streamType) noexcept { - DCHECK( - streamType == StreamType::STREAM_HEADER_TRACK || - streamType == StreamType::STREAM_HEADER_SUBGROUP); +folly::Expected parseSubgroupHeader( + folly::io::Cursor& cursor) noexcept { auto length = cursor.totalLength(); ObjectHeader objectHeader; objectHeader.group = std::numeric_limits::max(); // unset @@ -263,24 +258,18 @@ folly::Expected parseStreamHeader( } length -= trackAlias->second; objectHeader.trackIdentifier = TrackAlias(trackAlias->first); - if (streamType == StreamType::STREAM_HEADER_SUBGROUP) { - auto group = quic::decodeQuicInteger(cursor, length); - if (!group) { - return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); - } - length -= group->second; - objectHeader.group = group->first; - auto subgroup = quic::decodeQuicInteger(cursor, length); - if (!subgroup) { - return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); - } - objectHeader.subgroup = subgroup->first; - length -= subgroup->second; - objectHeader.forwardPreference = ForwardPreference::Subgroup; - } else { - objectHeader.subgroup = std::numeric_limits::max(); - objectHeader.forwardPreference = ForwardPreference::Track; + auto group = quic::decodeQuicInteger(cursor, length); + if (!group) { + return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); + } + length -= group->second; + objectHeader.group = group->first; + auto subgroup = quic::decodeQuicInteger(cursor, length); + if (!subgroup) { + return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); } + objectHeader.subgroup = subgroup->first; + length -= subgroup->second; if (cursor.canAdvance(1)) { objectHeader.priority = cursor.readBE(); length -= 1; @@ -295,26 +284,18 @@ folly::Expected parseMultiObjectHeader( StreamType streamType, const ObjectHeader& headerTemplate) noexcept { DCHECK( - streamType == StreamType::STREAM_HEADER_TRACK || streamType == StreamType::STREAM_HEADER_SUBGROUP || streamType == StreamType::FETCH_HEADER); // TODO get rid of this auto length = cursor.totalLength(); ObjectHeader objectHeader = headerTemplate; - if (streamType == StreamType::STREAM_HEADER_TRACK || - streamType == StreamType::FETCH_HEADER) { + if (streamType == StreamType::FETCH_HEADER) { auto group = quic::decodeQuicInteger(cursor, length); if (!group) { return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); } length -= group->second; objectHeader.group = group->first; - objectHeader.forwardPreference = ForwardPreference::Track; - } else { - objectHeader.forwardPreference = ForwardPreference::Subgroup; - } - if (streamType == StreamType::FETCH_HEADER) { - objectHeader.forwardPreference = ForwardPreference::Fetch; auto subgroup = quic::decodeQuicInteger(cursor, length); if (!subgroup) { return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); @@ -1166,39 +1147,35 @@ WriteResult writeServerSetup( return size; } -WriteResult writeStreamHeader( +WriteResult writeSubgroupHeader( folly::IOBufQueue& writeBuf, const ObjectHeader& objectHeader) noexcept { size_t size = 0; bool error = false; - - if (objectHeader.forwardPreference == ForwardPreference::Track) { - writeVarint( - writeBuf, - folly::to_underlying(StreamType::STREAM_HEADER_TRACK), - size, - error); - } else if (objectHeader.forwardPreference == ForwardPreference::Subgroup) { - writeVarint( - writeBuf, - folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP), - size, - error); - } else if (objectHeader.forwardPreference == ForwardPreference::Fetch) { - writeVarint( - writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error); - } else { - LOG(FATAL) << "Unsupported forward preference to stream header"; - } + writeVarint( + writeBuf, + folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP), + size, + error); writeVarint(writeBuf, value(objectHeader.trackIdentifier), size, error); - if (objectHeader.forwardPreference == ForwardPreference::Subgroup) { - writeVarint(writeBuf, objectHeader.group, size, error); - writeVarint(writeBuf, objectHeader.subgroup, size, error); - } - if (objectHeader.forwardPreference != ForwardPreference::Fetch) { - writeBuf.append(&objectHeader.priority, 1); - size += 1; + writeVarint(writeBuf, objectHeader.group, size, error); + writeVarint(writeBuf, objectHeader.subgroup, size, error); + writeBuf.append(&objectHeader.priority, 1); + size += 1; + if (error) { + return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR); } + return size; +} + +WriteResult writeFetchHeader( + folly::IOBufQueue& writeBuf, + SubscribeID subscribeID) noexcept { + size_t size = 0; + bool error = false; + writeVarint( + writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error); + writeVarint(writeBuf, subscribeID.value, size, error); if (error) { return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR); } @@ -1209,10 +1186,13 @@ WriteResult writeSingleObjectStream( folly::IOBufQueue& writeBuf, const ObjectHeader& objectHeader, std::unique_ptr objectPayload) noexcept { - CHECK(objectHeader.forwardPreference != ForwardPreference::Datagram); - auto res = writeStreamHeader(writeBuf, objectHeader); + auto res = writeSubgroupHeader(writeBuf, objectHeader); if (res) { - return writeObject(writeBuf, objectHeader, std::move(objectPayload)); + return writeObject( + writeBuf, + StreamType::STREAM_HEADER_SUBGROUP, + objectHeader, + std::move(objectPayload)); } else { return res; } @@ -1220,27 +1200,23 @@ WriteResult writeSingleObjectStream( WriteResult writeObject( folly::IOBufQueue& writeBuf, + StreamType streamType, const ObjectHeader& objectHeader, std::unique_ptr objectPayload) noexcept { size_t size = 0; bool error = false; - if (objectHeader.forwardPreference == ForwardPreference::Datagram) { - writeVarint( - writeBuf, - folly::to_underlying(StreamType::OBJECT_DATAGRAM), - size, - error); + if (streamType == StreamType::OBJECT_DATAGRAM) { + writeVarint(writeBuf, folly::to_underlying(streamType), size, error); writeVarint(writeBuf, value(objectHeader.trackIdentifier), size, error); } - if (objectHeader.forwardPreference != ForwardPreference::Subgroup) { + if (streamType != StreamType::STREAM_HEADER_SUBGROUP) { writeVarint(writeBuf, objectHeader.group, size, error); } - if (objectHeader.forwardPreference == ForwardPreference::Fetch) { + if (streamType == StreamType::FETCH_HEADER) { writeVarint(writeBuf, objectHeader.subgroup, size, error); } writeVarint(writeBuf, objectHeader.id, size, error); - if (objectHeader.forwardPreference == ForwardPreference::Datagram || - objectHeader.forwardPreference == ForwardPreference::Fetch) { + if (streamType != StreamType::STREAM_HEADER_SUBGROUP) { writeBuf.append(&objectHeader.priority, 1); size += 1; } @@ -1730,12 +1706,10 @@ const char* getStreamTypeString(StreamType type) { switch (type) { case StreamType::OBJECT_DATAGRAM: return "OBJECT_DATAGRAM"; - case StreamType::STREAM_HEADER_TRACK: - return "STREAM_HEADER_TRACK"; case StreamType::STREAM_HEADER_SUBGROUP: return "STREAM_HEADER_SUBGROUP"; - case StreamType::CONTROL: - return "CONTROL"; + case StreamType::FETCH_HEADER: + return "FETCH_HEADER"; default: // can happen when type was cast from uint8_t return "Unknown"; @@ -1796,7 +1770,6 @@ std::ostream& operator<<(std::ostream& os, const ObjectHeader& header) { os << " trackIdentifier=" << value(header.trackIdentifier) << " group=" << header.group << " subgroup=" << header.subgroup << " id=" << header.id << " priority=" << uint32_t(header.priority) - << " forwardPreference=" << uint32_t(header.forwardPreference) << " status=" << getObjectStatusString(header.status) << " length=" << (header.length.hasValue() ? std::to_string(header.length.value()) : "none"); diff --git a/moxygen/MoQFramer.h b/moxygen/MoQFramer.h index 32e20742..0da944b0 100644 --- a/moxygen/MoQFramer.h +++ b/moxygen/MoQFramer.h @@ -122,10 +122,8 @@ enum class FrameType : uint64_t { enum class StreamType : uint64_t { OBJECT_DATAGRAM = 1, - STREAM_HEADER_TRACK = 0x2, STREAM_HEADER_SUBGROUP = 0x4, FETCH_HEADER = 0x5, - CONTROL = 100000000 }; std::ostream& operator<<(std::ostream& os, FrameType type); @@ -181,8 +179,6 @@ folly::Expected parseServerSetup( folly::io::Cursor& cursor, size_t length) noexcept; -enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram, Fetch }; - enum class ObjectStatus : uint64_t { NORMAL = 0, OBJECT_NOT_EXIST = 1, @@ -265,7 +261,6 @@ struct ObjectHeader { uint64_t subgroup{0}; // meaningless for Track and Datagram uint64_t id; uint8_t priority; - ForwardPreference forwardPreference; ObjectStatus status{ObjectStatus::NORMAL}; folly::Optional length{folly::none}; }; @@ -280,9 +275,8 @@ folly::Expected parseObjectHeader( folly::Expected parseFetchHeader( folly::io::Cursor& cursor) noexcept; -folly::Expected parseStreamHeader( - folly::io::Cursor& cursor, - StreamType streamType) noexcept; +folly::Expected parseSubgroupHeader( + folly::io::Cursor& cursor) noexcept; folly::Expected parseMultiObjectHeader( folly::io::Cursor& cursor, @@ -685,12 +679,22 @@ WriteResult writeServerSetup( folly::IOBufQueue& writeBuf, const ServerSetup& serverSetup) noexcept; +WriteResult writeSubgroupHeader( + folly::IOBufQueue& writeBuf, + const ObjectHeader& objectHeader) noexcept; + +WriteResult writeFetchHeader( + folly::IOBufQueue& writeBuf, + SubscribeID subscribeID) noexcept; + WriteResult writeStreamHeader( folly::IOBufQueue& writeBuf, + StreamType streamType, const ObjectHeader& objectHeader) noexcept; WriteResult writeObject( folly::IOBufQueue& writeBuf, + StreamType streamType, const ObjectHeader& objectHeader, std::unique_ptr objectPayload) noexcept; diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index dfc5c943..22610bb3 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -219,6 +219,7 @@ class StreamPublisherImpl : public SubgroupConsumer, MoQSession::PublisherImpl* publisher_{nullptr}; proxygen::WebTransport::StreamWriteHandle* writeHandle_{nullptr}; + StreamType streamType_; ObjectHeader header_; folly::Optional currentLengthRemaining_; folly::IOBufQueue writeBuf_{folly::IOBufQueue::cacheChainLength()}; @@ -396,7 +397,6 @@ TrackPublisherImpl::groupNotExists( subgroupID, 0, priority, - ForwardPreference::Subgroup, ObjectStatus::GROUP_NOT_EXIST, 0}, nullptr); @@ -405,7 +405,6 @@ TrackPublisherImpl::groupNotExists( folly::Expected TrackPublisherImpl::datagram( const ObjectHeader& header, Payload payload) { - XCHECK(header.forwardPreference == ForwardPreference::Datagram); auto wt = getWebTransport(); if (!wt) { XLOG(ERR) << "Trying to publish after subscribeDone"; @@ -416,13 +415,13 @@ folly::Expected TrackPublisherImpl::datagram( XCHECK(header.length); (void)writeObject( writeBuf, + StreamType::OBJECT_DATAGRAM, ObjectHeader{ trackAlias_, header.group, header.id, header.id, header.priority, - header.forwardPreference, header.status, *header.length}, std::move(payload)); @@ -487,16 +486,16 @@ StreamPublisherImpl::StreamPublisherImpl( }), publisher_(publisher), writeHandle_(writeHandle), + streamType_(StreamType::FETCH_HEADER), header_{ publisher->subscribeID(), 0, 0, std::numeric_limits::max(), 0, - ForwardPreference::Fetch, ObjectStatus::NORMAL, folly::none} { - (void)writeStreamHeader(writeBuf_, header_); + (void)writeFetchHeader(writeBuf_, publisher->subscribeID()); } StreamPublisherImpl::StreamPublisherImpl( @@ -506,11 +505,11 @@ StreamPublisherImpl::StreamPublisherImpl( uint64_t groupID, uint64_t subgroupID) : StreamPublisherImpl(publisher, writeHandle) { - writeBuf_.move(); + streamType_ = StreamType::STREAM_HEADER_SUBGROUP; header_.trackIdentifier = alias; - header_.forwardPreference = ForwardPreference::Subgroup; setGroupAndSubgroup(groupID, subgroupID); - (void)writeStreamHeader(writeBuf_, header_); + writeBuf_.move(); // clear FETCH_HEADER + (void)writeSubgroupHeader(writeBuf_, header_); } // Private methods @@ -552,7 +551,7 @@ StreamPublisherImpl::writeCurrentObject( bool finStream) { header_.id = objectID; header_.length = length; - (void)writeObject(writeBuf_, header_, std::move(payload)); + (void)writeObject(writeBuf_, streamType_, header_, std::move(payload)); return writeToStream(finStream); } diff --git a/moxygen/ObjectReceiver.h b/moxygen/ObjectReceiver.h index 38aa32e5..75e238b8 100644 --- a/moxygen/ObjectReceiver.h +++ b/moxygen/ObjectReceiver.h @@ -19,6 +19,7 @@ class ObjectReceiverCallback { class ObjectSubgroupReceiver : public SubgroupConsumer { ObjectReceiverCallback* callback_{nullptr}; + StreamType streamType_; ObjectHeader header_; folly::IOBufQueue payload_{folly::IOBufQueue::cacheChainLength()}; @@ -29,20 +30,20 @@ class ObjectSubgroupReceiver : public SubgroupConsumer { uint64_t subgroupID = 0, uint8_t priority = 0) : callback_(callback), + streamType_(StreamType::STREAM_HEADER_SUBGROUP), header_{ TrackAlias(0), groupID, subgroupID, 0, priority, - ForwardPreference::Subgroup, ObjectStatus::NORMAL, folly::none} {} void setFetchGroupAndSubgroup(uint64_t groupID, uint64_t subgroupID) { + streamType_ = StreamType::FETCH_HEADER; header_.group = groupID; header_.subgroup = subgroupID; - header_.forwardPreference = ForwardPreference::Fetch; } folly::Expected @@ -51,7 +52,7 @@ class ObjectSubgroupReceiver : public SubgroupConsumer { header_.status = ObjectStatus::NORMAL; auto fcState = callback_->onObject(header_, std::move(payload)); if (fcState == ObjectReceiverCallback::FlowControlState::BLOCKED) { - if (header_.forwardPreference == ForwardPreference::Fetch) { + if (streamType_ == StreamType::FETCH_HEADER) { return folly::makeUnexpected(MoQPublishError(MoQPublishError::BLOCKED)); } else { XLOG(WARN) << "ObjectReceiverCallback returned BLOCKED for Subscribe"; @@ -165,7 +166,6 @@ class ObjectReceiver : public TrackConsumer, public FetchConsumer { subgroup, 0, pri, - ForwardPreference::Subgroup, ObjectStatus::END_OF_TRACK_AND_GROUP, 0}); return folly::unit; diff --git a/moxygen/samples/chat/MoQChatClient.cpp b/moxygen/samples/chat/MoQChatClient.cpp index a4afb240..4e4e6a72 100644 --- a/moxygen/samples/chat/MoQChatClient.cpp +++ b/moxygen/samples/chat/MoQChatClient.cpp @@ -167,7 +167,6 @@ void MoQChatClient::publishLoop() { /*subgroup=*/0, /*id=*/0, /*pri=*/0, - ForwardPreference::Subgroup, ObjectStatus::NORMAL}, folly::IOBuf::copyBuffer(input)); } diff --git a/moxygen/samples/date/MoQDateServer.cpp b/moxygen/samples/date/MoQDateServer.cpp index 210ab958..adce96c7 100644 --- a/moxygen/samples/date/MoQDateServer.cpp +++ b/moxygen/samples/date/MoQDateServer.cpp @@ -312,7 +312,6 @@ class MoQDateServer : MoQServer { subgroup, object, /*priority=*/0, - ForwardPreference::Subgroup, ObjectStatus::NORMAL, folly::none}; if (second == 0) { diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index c13b396f..11c9ef27 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -231,7 +231,6 @@ class MoQFlvStreamerClient { /*subgroup=*/0, latestAudio_.object, AUDIO_STREAM_PRIORITY, - ForwardPreference::Subgroup, ObjectStatus::NORMAL}; XLOG(DBG1) << "Sending audio frame" << objHeader diff --git a/moxygen/test/MoQCodecTest.cpp b/moxygen/test/MoQCodecTest.cpp index 3edd1459..454d0ace 100644 --- a/moxygen/test/MoQCodecTest.cpp +++ b/moxygen/test/MoQCodecTest.cpp @@ -166,14 +166,7 @@ TEST(MoQCodec, ObjectStreamPayloadFin) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; writeSingleObjectStream( writeBuf, - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL, - 11}, + {TrackAlias(1), 2, 3, 4, 5, ObjectStatus::NORMAL, 11}, folly::IOBuf::copyBuffer("hello world")); testing::StrictMock callback; MoQObjectStreamCodec codec(&callback); @@ -189,14 +182,7 @@ TEST(MoQCodec, ObjectStreamPayload) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; writeSingleObjectStream( writeBuf, - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL, - 11}, + {TrackAlias(1), 2, 3, 4, 5, ObjectStatus::NORMAL, 11}, folly::IOBuf::copyBuffer("hello world")); testing::NiceMock callback; MoQObjectStreamCodec codec(&callback); @@ -214,14 +200,7 @@ TEST(MoQCodec, EmptyObjectPayload) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; writeSingleObjectStream( writeBuf, - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Subgroup, - ObjectStatus::OBJECT_NOT_EXIST, - folly::none}, + {TrackAlias(1), 2, 3, 4, 5, ObjectStatus::OBJECT_NOT_EXIST, folly::none}, nullptr); testing::NiceMock callback; MoQObjectStreamCodec codec(&callback); @@ -238,7 +217,7 @@ TEST(MoQCodec, EmptyObjectPayload) { TEST(MoQCodec, TruncatedObject) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; - auto res = writeStreamHeader( + auto res = writeSubgroupHeader( writeBuf, ObjectHeader({ TrackAlias(1), @@ -246,21 +225,13 @@ TEST(MoQCodec, TruncatedObject) { 3, 4, 5, - ForwardPreference::Track, ObjectStatus::NORMAL, folly::none, })); res = writeObject( writeBuf, - ObjectHeader( - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Track, - ObjectStatus::NORMAL, - 11}), + StreamType::STREAM_HEADER_SUBGROUP, + ObjectHeader({TrackAlias(1), 2, 3, 4, 5, ObjectStatus::NORMAL, 11}), folly::IOBuf::copyBuffer("hello")); // missing " world" testing::NiceMock callback; MoQObjectStreamCodec codec(&callback); @@ -274,7 +245,7 @@ TEST(MoQCodec, TruncatedObject) { TEST(MoQCodec, TruncatedObjectPayload) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; - auto res = writeStreamHeader( + auto res = writeSubgroupHeader( writeBuf, ObjectHeader({ TrackAlias(1), @@ -282,21 +253,13 @@ TEST(MoQCodec, TruncatedObjectPayload) { 3, 4, 5, - ForwardPreference::Subgroup, ObjectStatus::NORMAL, folly::none, })); res = writeObject( writeBuf, - ObjectHeader( - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL, - 11}), + StreamType::STREAM_HEADER_SUBGROUP, + ObjectHeader({TrackAlias(1), 2, 3, 4, 5, ObjectStatus::NORMAL, 11}), nullptr); testing::NiceMock callback; MoQObjectStreamCodec codec(&callback); @@ -338,27 +301,29 @@ TEST(MoQCodec, Fetch) { testing::StrictMock callback; MoQObjectStreamCodec codec(&callback); folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; + SubscribeID subscribeId(1); ObjectHeader obj{ - SubscribeID(1), + subscribeId, 2, 3, 4, 5, - ForwardPreference::Fetch, ObjectStatus::NORMAL, folly::none, }; - auto res = writeStreamHeader(writeBuf, ObjectHeader(obj)); + StreamType streamType = StreamType::FETCH_HEADER; + auto res = writeFetchHeader(writeBuf, subscribeId); obj.length = 5; - res = writeObject(writeBuf, obj, folly::IOBuf::copyBuffer("hello")); + res = + writeObject(writeBuf, streamType, obj, folly::IOBuf::copyBuffer("hello")); obj.id++; obj.status = ObjectStatus::END_OF_TRACK_AND_GROUP; obj.length = 0; - res = writeObject(writeBuf, obj, nullptr); + res = writeObject(writeBuf, streamType, obj, nullptr); obj.id++; obj.status = ObjectStatus::GROUP_NOT_EXIST; obj.length = 0; - res = writeObject(writeBuf, obj, nullptr); + res = writeObject(writeBuf, streamType, obj, nullptr); EXPECT_CALL(callback, onFetchHeader(testing::_)); EXPECT_CALL(callback, onObjectBegin(2, 3, 4, 5, testing::_, true, false)); @@ -373,17 +338,17 @@ TEST(MoQCodec, FetchHeaderUnderflow) { testing::StrictMock callback; MoQObjectStreamCodec codec(&callback); folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; + SubscribeID subscribeId(0xffffffffffffff); ObjectHeader obj{ - SubscribeID(0xffffffffffffff), + subscribeId, 2, 3, 4, 5, - ForwardPreference::Fetch, ObjectStatus::NORMAL, folly::none, }; - writeStreamHeader(writeBuf, ObjectHeader(obj)); + writeFetchHeader(writeBuf, subscribeId); // only deliver first byte of fetch header EXPECT_CALL(callback, onConnectionError(ErrorCode::PARSE_UNDERFLOW)); codec.onIngress(writeBuf.splitAtMost(2), true); diff --git a/moxygen/test/MoQFramerTest.cpp b/moxygen/test/MoQFramerTest.cpp index 2a8894f0..3ade17fb 100644 --- a/moxygen/test/MoQFramerTest.cpp +++ b/moxygen/test/MoQFramerTest.cpp @@ -151,11 +151,11 @@ void parseAll(folly::io::Cursor& cursor, bool eom) { auto r19 = parseFetchCancel(cursor, frameLength(cursor)); testUnderflowResult(r19); - auto res = parseStreamHeader(cursor, StreamType::STREAM_HEADER_TRACK); + auto res = parseSubgroupHeader(cursor); testUnderflowResult(res); auto r15 = parseMultiObjectHeader( - cursor, StreamType::STREAM_HEADER_TRACK, res.value()); + cursor, StreamType::STREAM_HEADER_SUBGROUP, res.value()); testUnderflowResult(r15); skip(cursor, 1); } @@ -172,12 +172,12 @@ TEST(SerializeAndParse, ParseObjectHeader) { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; auto result = writeObject( writeBuf, + StreamType::OBJECT_DATAGRAM, {TrackAlias(22), // trackAlias 33, // group 0, // subgroup 44, // id 55, // priority - ForwardPreference::Datagram, ObjectStatus::OBJECT_NOT_EXIST, 0}, nullptr); @@ -232,31 +232,36 @@ TEST(SerializeAndParse, ParseStreamHeader) { 0, // subgroup 44, // id 55, // priority - ForwardPreference::Track, ObjectStatus::NORMAL, 4}; folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; - auto result = writeStreamHeader(writeBuf, expectedObjectHeader); + auto result = writeSubgroupHeader(writeBuf, expectedObjectHeader); EXPECT_TRUE(result.hasValue()); result = writeObject( - writeBuf, expectedObjectHeader, folly::IOBuf::copyBuffer("EFGH")); + writeBuf, + StreamType::STREAM_HEADER_SUBGROUP, + expectedObjectHeader, + folly::IOBuf::copyBuffer("EFGH")); EXPECT_TRUE(result.hasValue()); // Test ObjectStatus::OBJECT_NOT_EXIST expectedObjectHeader.status = ObjectStatus::OBJECT_NOT_EXIST; expectedObjectHeader.length = 0; - result = writeObject(writeBuf, expectedObjectHeader, nullptr); + result = writeObject( + writeBuf, + StreamType::STREAM_HEADER_SUBGROUP, + expectedObjectHeader, + nullptr); EXPECT_TRUE(result.hasValue()); auto serialized = writeBuf.move(); folly::io::Cursor cursor(serialized.get()); - EXPECT_EQ(parseStreamType(cursor), StreamType::STREAM_HEADER_TRACK); - auto parseStreamHeaderResult = - parseStreamHeader(cursor, StreamType::STREAM_HEADER_TRACK); + EXPECT_EQ(parseStreamType(cursor), StreamType::STREAM_HEADER_SUBGROUP); + auto parseStreamHeaderResult = parseSubgroupHeader(cursor); EXPECT_TRUE(parseStreamHeaderResult.hasValue()); auto parseResult = parseMultiObjectHeader( - cursor, StreamType::STREAM_HEADER_TRACK, *parseStreamHeaderResult); + cursor, StreamType::STREAM_HEADER_SUBGROUP, *parseStreamHeaderResult); EXPECT_TRUE(parseResult.hasValue()); EXPECT_EQ(std::get(parseResult->trackIdentifier), TrackAlias(22)); EXPECT_EQ(parseResult->group, 33); @@ -267,7 +272,7 @@ TEST(SerializeAndParse, ParseStreamHeader) { cursor.skip(*parseResult->length); parseResult = parseMultiObjectHeader( - cursor, StreamType::STREAM_HEADER_TRACK, *parseStreamHeaderResult); + cursor, StreamType::STREAM_HEADER_SUBGROUP, *parseStreamHeaderResult); EXPECT_TRUE(parseResult.hasValue()); EXPECT_EQ(std::get(parseResult->trackIdentifier), TrackAlias(22)); EXPECT_EQ(parseResult->group, 33); diff --git a/moxygen/test/TestUtils.cpp b/moxygen/test/TestUtils.cpp index bc16132b..878a37f7 100644 --- a/moxygen/test/TestUtils.cpp +++ b/moxygen/test/TestUtils.cpp @@ -192,7 +192,7 @@ std::unique_ptr writeAllControlMessages(TestControlMessages in) { std::unique_ptr writeAllObjectMessages() { folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; - auto res = writeStreamHeader( + auto res = writeSubgroupHeader( writeBuf, ObjectHeader({ TrackAlias(1), @@ -200,33 +200,19 @@ std::unique_ptr writeAllObjectMessages() { 3, 4, 5, - ForwardPreference::Track, ObjectStatus::NORMAL, folly::none, })); res = writeObject( writeBuf, - ObjectHeader( - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Track, - ObjectStatus::NORMAL, - 11}), + StreamType::STREAM_HEADER_SUBGROUP, + ObjectHeader({TrackAlias(1), 2, 3, 4, 5, ObjectStatus::NORMAL, 11}), folly::IOBuf::copyBuffer("hello world")); res = writeObject( writeBuf, + StreamType::STREAM_HEADER_SUBGROUP, ObjectHeader( - {TrackAlias(1), - 2, - 3, - 4, - 5, - ForwardPreference::Track, - ObjectStatus::END_OF_TRACK_AND_GROUP, - 0}), + {TrackAlias(1), 2, 3, 4, 5, ObjectStatus::END_OF_TRACK_AND_GROUP, 0}), nullptr); return writeBuf.move(); }