Skip to content

Commit

Permalink
Remove ForwardPreference from ObjectHeader
Browse files Browse the repository at this point in the history
Summary: This is a vestige of the old API and draft versions.  draft-07 still defines datagram vs subgroup preferences as an immutable, publisher selected property.  However, there isn't necessarily need to cache this, because cached objects are retrieved via FETCH, on a single stream, which discards the preference.  Forwarding the preferrnce in moxygen is a matter of using the correct API (datagram() vs objectStream() or beginSubgroup()).

Reviewed By: NEUDitao

Differential Revision: D67906300

fbshipit-source-id: 06a37f8bbce43b78971059b488e30d4f50db7bea
  • Loading branch information
afrind authored and facebook-github-bot committed Jan 8, 2025
1 parent e544bc8 commit 5195b9b
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 190 deletions.
3 changes: 1 addition & 2 deletions moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ void MoQObjectStreamCodec::onIngress(
cursor = newCursor;
streamType_ = StreamType(type->first);
switch (streamType_) {
case StreamType::STREAM_HEADER_TRACK:
case StreamType::STREAM_HEADER_SUBGROUP:
parseState_ = ParseState::OBJECT_STREAM;
break;
Expand Down Expand Up @@ -159,7 +158,7 @@ void MoQObjectStreamCodec::onIngress(
}
case ParseState::OBJECT_STREAM: {
auto newCursor = cursor;
auto res = parseStreamHeader(newCursor, streamType_);
auto res = parseSubgroupHeader(newCursor);
if (res.hasError()) {
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
Expand Down
129 changes: 51 additions & 78 deletions moxygen/MoQFramer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,11 @@ folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
} else {
objectHeader.status = ObjectStatus::NORMAL;
}
objectHeader.forwardPreference = ForwardPreference::Datagram;
return objectHeader;
}

folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
folly::io::Cursor& cursor,
StreamType streamType) noexcept {
DCHECK(
streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::STREAM_HEADER_SUBGROUP);
folly::Expected<ObjectHeader, ErrorCode> parseSubgroupHeader(
folly::io::Cursor& cursor) noexcept {
auto length = cursor.totalLength();
ObjectHeader objectHeader;
objectHeader.group = std::numeric_limits<uint64_t>::max(); // unset
Expand All @@ -263,24 +258,18 @@ folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
}
length -= trackAlias->second;
objectHeader.trackIdentifier = TrackAlias(trackAlias->first);
if (streamType == StreamType::STREAM_HEADER_SUBGROUP) {
auto group = quic::decodeQuicInteger(cursor, length);
if (!group) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= group->second;
objectHeader.group = group->first;
auto subgroup = quic::decodeQuicInteger(cursor, length);
if (!subgroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.subgroup = subgroup->first;
length -= subgroup->second;
objectHeader.forwardPreference = ForwardPreference::Subgroup;
} else {
objectHeader.subgroup = std::numeric_limits<uint64_t>::max();
objectHeader.forwardPreference = ForwardPreference::Track;
auto group = quic::decodeQuicInteger(cursor, length);
if (!group) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= group->second;
objectHeader.group = group->first;
auto subgroup = quic::decodeQuicInteger(cursor, length);
if (!subgroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.subgroup = subgroup->first;
length -= subgroup->second;
if (cursor.canAdvance(1)) {
objectHeader.priority = cursor.readBE<uint8_t>();
length -= 1;
Expand All @@ -295,26 +284,18 @@ folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
StreamType streamType,
const ObjectHeader& headerTemplate) noexcept {
DCHECK(
streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::STREAM_HEADER_SUBGROUP ||
streamType == StreamType::FETCH_HEADER);
// TODO get rid of this
auto length = cursor.totalLength();
ObjectHeader objectHeader = headerTemplate;
if (streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::FETCH_HEADER) {
if (streamType == StreamType::FETCH_HEADER) {
auto group = quic::decodeQuicInteger(cursor, length);
if (!group) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= group->second;
objectHeader.group = group->first;
objectHeader.forwardPreference = ForwardPreference::Track;
} else {
objectHeader.forwardPreference = ForwardPreference::Subgroup;
}
if (streamType == StreamType::FETCH_HEADER) {
objectHeader.forwardPreference = ForwardPreference::Fetch;
auto subgroup = quic::decodeQuicInteger(cursor, length);
if (!subgroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand Down Expand Up @@ -1166,39 +1147,35 @@ WriteResult writeServerSetup(
return size;
}

WriteResult writeStreamHeader(
WriteResult writeSubgroupHeader(
folly::IOBufQueue& writeBuf,
const ObjectHeader& objectHeader) noexcept {
size_t size = 0;
bool error = false;

if (objectHeader.forwardPreference == ForwardPreference::Track) {
writeVarint(
writeBuf,
folly::to_underlying(StreamType::STREAM_HEADER_TRACK),
size,
error);
} else if (objectHeader.forwardPreference == ForwardPreference::Subgroup) {
writeVarint(
writeBuf,
folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP),
size,
error);
} else if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeVarint(
writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error);
} else {
LOG(FATAL) << "Unsupported forward preference to stream header";
}
writeVarint(
writeBuf,
folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP),
size,
error);
writeVarint(writeBuf, value(objectHeader.trackIdentifier), size, error);
if (objectHeader.forwardPreference == ForwardPreference::Subgroup) {
writeVarint(writeBuf, objectHeader.group, size, error);
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
if (objectHeader.forwardPreference != ForwardPreference::Fetch) {
writeBuf.append(&objectHeader.priority, 1);
size += 1;
writeVarint(writeBuf, objectHeader.group, size, error);
writeVarint(writeBuf, objectHeader.subgroup, size, error);
writeBuf.append(&objectHeader.priority, 1);
size += 1;
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
return size;
}

WriteResult writeFetchHeader(
folly::IOBufQueue& writeBuf,
SubscribeID subscribeID) noexcept {
size_t size = 0;
bool error = false;
writeVarint(
writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error);
writeVarint(writeBuf, subscribeID.value, size, error);
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
Expand All @@ -1209,38 +1186,37 @@ WriteResult writeSingleObjectStream(
folly::IOBufQueue& writeBuf,
const ObjectHeader& objectHeader,
std::unique_ptr<folly::IOBuf> objectPayload) noexcept {
CHECK(objectHeader.forwardPreference != ForwardPreference::Datagram);
auto res = writeStreamHeader(writeBuf, objectHeader);
auto res = writeSubgroupHeader(writeBuf, objectHeader);
if (res) {
return writeObject(writeBuf, objectHeader, std::move(objectPayload));
return writeObject(
writeBuf,
StreamType::STREAM_HEADER_SUBGROUP,
objectHeader,
std::move(objectPayload));
} else {
return res;
}
}

WriteResult writeObject(
folly::IOBufQueue& writeBuf,
StreamType streamType,
const ObjectHeader& objectHeader,
std::unique_ptr<folly::IOBuf> objectPayload) noexcept {
size_t size = 0;
bool error = false;
if (objectHeader.forwardPreference == ForwardPreference::Datagram) {
writeVarint(
writeBuf,
folly::to_underlying(StreamType::OBJECT_DATAGRAM),
size,
error);
if (streamType == StreamType::OBJECT_DATAGRAM) {
writeVarint(writeBuf, folly::to_underlying(streamType), size, error);
writeVarint(writeBuf, value(objectHeader.trackIdentifier), size, error);
}
if (objectHeader.forwardPreference != ForwardPreference::Subgroup) {
if (streamType != StreamType::STREAM_HEADER_SUBGROUP) {
writeVarint(writeBuf, objectHeader.group, size, error);
}
if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
if (streamType == StreamType::FETCH_HEADER) {
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
writeVarint(writeBuf, objectHeader.id, size, error);
if (objectHeader.forwardPreference == ForwardPreference::Datagram ||
objectHeader.forwardPreference == ForwardPreference::Fetch) {
if (streamType != StreamType::STREAM_HEADER_SUBGROUP) {
writeBuf.append(&objectHeader.priority, 1);
size += 1;
}
Expand Down Expand Up @@ -1730,12 +1706,10 @@ const char* getStreamTypeString(StreamType type) {
switch (type) {
case StreamType::OBJECT_DATAGRAM:
return "OBJECT_DATAGRAM";
case StreamType::STREAM_HEADER_TRACK:
return "STREAM_HEADER_TRACK";
case StreamType::STREAM_HEADER_SUBGROUP:
return "STREAM_HEADER_SUBGROUP";
case StreamType::CONTROL:
return "CONTROL";
case StreamType::FETCH_HEADER:
return "FETCH_HEADER";
default:
// can happen when type was cast from uint8_t
return "Unknown";
Expand Down Expand Up @@ -1796,7 +1770,6 @@ std::ostream& operator<<(std::ostream& os, const ObjectHeader& header) {
os << " trackIdentifier=" << value(header.trackIdentifier)
<< " group=" << header.group << " subgroup=" << header.subgroup
<< " id=" << header.id << " priority=" << uint32_t(header.priority)
<< " forwardPreference=" << uint32_t(header.forwardPreference)
<< " status=" << getObjectStatusString(header.status) << " length="
<< (header.length.hasValue() ? std::to_string(header.length.value())
: "none");
Expand Down
20 changes: 12 additions & 8 deletions moxygen/MoQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ enum class FrameType : uint64_t {

enum class StreamType : uint64_t {
OBJECT_DATAGRAM = 1,
STREAM_HEADER_TRACK = 0x2,
STREAM_HEADER_SUBGROUP = 0x4,
FETCH_HEADER = 0x5,
CONTROL = 100000000
};

std::ostream& operator<<(std::ostream& os, FrameType type);
Expand Down Expand Up @@ -181,8 +179,6 @@ folly::Expected<ServerSetup, ErrorCode> parseServerSetup(
folly::io::Cursor& cursor,
size_t length) noexcept;

enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram, Fetch };

enum class ObjectStatus : uint64_t {
NORMAL = 0,
OBJECT_NOT_EXIST = 1,
Expand Down Expand Up @@ -265,7 +261,6 @@ struct ObjectHeader {
uint64_t subgroup{0}; // meaningless for Track and Datagram
uint64_t id;
uint8_t priority;
ForwardPreference forwardPreference;
ObjectStatus status{ObjectStatus::NORMAL};
folly::Optional<uint64_t> length{folly::none};
};
Expand All @@ -280,9 +275,8 @@ folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
folly::Expected<uint64_t, ErrorCode> parseFetchHeader(
folly::io::Cursor& cursor) noexcept;

folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
folly::io::Cursor& cursor,
StreamType streamType) noexcept;
folly::Expected<ObjectHeader, ErrorCode> parseSubgroupHeader(
folly::io::Cursor& cursor) noexcept;

folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
folly::io::Cursor& cursor,
Expand Down Expand Up @@ -685,12 +679,22 @@ WriteResult writeServerSetup(
folly::IOBufQueue& writeBuf,
const ServerSetup& serverSetup) noexcept;

WriteResult writeSubgroupHeader(
folly::IOBufQueue& writeBuf,
const ObjectHeader& objectHeader) noexcept;

WriteResult writeFetchHeader(
folly::IOBufQueue& writeBuf,
SubscribeID subscribeID) noexcept;

WriteResult writeStreamHeader(
folly::IOBufQueue& writeBuf,
StreamType streamType,
const ObjectHeader& objectHeader) noexcept;

WriteResult writeObject(
folly::IOBufQueue& writeBuf,
StreamType streamType,
const ObjectHeader& objectHeader,
std::unique_ptr<folly::IOBuf> objectPayload) noexcept;

Expand Down
17 changes: 8 additions & 9 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class StreamPublisherImpl : public SubgroupConsumer,

MoQSession::PublisherImpl* publisher_{nullptr};
proxygen::WebTransport::StreamWriteHandle* writeHandle_{nullptr};
StreamType streamType_;
ObjectHeader header_;
folly::Optional<uint64_t> currentLengthRemaining_;
folly::IOBufQueue writeBuf_{folly::IOBufQueue::cacheChainLength()};
Expand Down Expand Up @@ -396,7 +397,6 @@ TrackPublisherImpl::groupNotExists(
subgroupID,
0,
priority,
ForwardPreference::Subgroup,
ObjectStatus::GROUP_NOT_EXIST,
0},
nullptr);
Expand All @@ -405,7 +405,6 @@ TrackPublisherImpl::groupNotExists(
folly::Expected<folly::Unit, MoQPublishError> TrackPublisherImpl::datagram(
const ObjectHeader& header,
Payload payload) {
XCHECK(header.forwardPreference == ForwardPreference::Datagram);
auto wt = getWebTransport();
if (!wt) {
XLOG(ERR) << "Trying to publish after subscribeDone";
Expand All @@ -416,13 +415,13 @@ folly::Expected<folly::Unit, MoQPublishError> TrackPublisherImpl::datagram(
XCHECK(header.length);
(void)writeObject(
writeBuf,
StreamType::OBJECT_DATAGRAM,
ObjectHeader{
trackAlias_,
header.group,
header.id,
header.id,
header.priority,
header.forwardPreference,
header.status,
*header.length},
std::move(payload));
Expand Down Expand Up @@ -487,16 +486,16 @@ StreamPublisherImpl::StreamPublisherImpl(
}),
publisher_(publisher),
writeHandle_(writeHandle),
streamType_(StreamType::FETCH_HEADER),
header_{
publisher->subscribeID(),
0,
0,
std::numeric_limits<uint64_t>::max(),
0,
ForwardPreference::Fetch,
ObjectStatus::NORMAL,
folly::none} {
(void)writeStreamHeader(writeBuf_, header_);
(void)writeFetchHeader(writeBuf_, publisher->subscribeID());
}

StreamPublisherImpl::StreamPublisherImpl(
Expand All @@ -506,11 +505,11 @@ StreamPublisherImpl::StreamPublisherImpl(
uint64_t groupID,
uint64_t subgroupID)
: StreamPublisherImpl(publisher, writeHandle) {
writeBuf_.move();
streamType_ = StreamType::STREAM_HEADER_SUBGROUP;
header_.trackIdentifier = alias;
header_.forwardPreference = ForwardPreference::Subgroup;
setGroupAndSubgroup(groupID, subgroupID);
(void)writeStreamHeader(writeBuf_, header_);
writeBuf_.move(); // clear FETCH_HEADER
(void)writeSubgroupHeader(writeBuf_, header_);
}

// Private methods
Expand Down Expand Up @@ -552,7 +551,7 @@ StreamPublisherImpl::writeCurrentObject(
bool finStream) {
header_.id = objectID;
header_.length = length;
(void)writeObject(writeBuf_, header_, std::move(payload));
(void)writeObject(writeBuf_, streamType_, header_, std::move(payload));
return writeToStream(finStream);
}

Expand Down
Loading

0 comments on commit 5195b9b

Please sign in to comment.