Skip to content

Commit

Permalink
Implement fetch using Publisher
Browse files Browse the repository at this point in the history
Summary: This one is the most straightforward

Reviewed By: sharmafb

Differential Revision: D68139006
  • Loading branch information
afrind authored and facebook-github-bot committed Jan 18, 2025
1 parent 4845a27 commit 5597175
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 265 deletions.
4 changes: 0 additions & 4 deletions moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ folly::Try<ServerSetup> MoQServer::onClientSetup(ClientSetup /*setup*/) {
}

// TODO: Implement message handling
void MoQServer::ControlVisitor::operator()(Fetch fetch) const {
XLOG(INFO) << "Fetch id=" << fetch.subscribeID;
}

void MoQServer::ControlVisitor::operator()(Announce announce) const {
XLOG(INFO) << "Announce ns=" << announce.trackNamespace;
clientSession_->announceError(
Expand Down
1 change: 0 additions & 1 deletion moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class MoQServer : public MoQSession::ServerSetupCallback {
~ControlVisitor() override = default;

void operator()(Announce announce) const override;
void operator()(Fetch fetch) const override;
void operator()(Unannounce unannounce) const override;
void operator()(AnnounceCancel announceCancel) const override;
void operator()(Goaway goaway) const override;
Expand Down
232 changes: 138 additions & 94 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ uint64_t getStreamPriority(
// or a Fetch response. It's of course illegal to mix-and-match the APIs, but
// the object is only handed to the application as either a SubgroupConsumer
// or a FetchConsumer
class StreamPublisherImpl : public SubgroupConsumer,
public FetchConsumer,
public folly::CancellationCallback {
class StreamPublisherImpl : public SubgroupConsumer, public FetchConsumer {
public:
StreamPublisherImpl() = delete;

// Fetch constructor
StreamPublisherImpl(
MoQSession::PublisherImpl* publisher,
proxygen::WebTransport::StreamWriteHandle* writeHandle);
// Fetch constructor - we defer creating the stream/writeHandle until the
// first published object.
StreamPublisherImpl(MoQSession::PublisherImpl* publisher);

// Subscribe constructor
StreamPublisherImpl(
Expand Down Expand Up @@ -204,6 +201,10 @@ class StreamPublisherImpl : public SubgroupConsumer,
return true;
}

folly::Expected<folly::Unit, MoQPublishError> ensureWriteHandle();

void setWriteHandle(proxygen::WebTransport::StreamWriteHandle* writeHandle);

folly::Expected<folly::Unit, MoQPublishError> validatePublish(
uint64_t objectID);
folly::Expected<ObjectPublishStatus, MoQPublishError>
Expand All @@ -218,6 +219,7 @@ class StreamPublisherImpl : public SubgroupConsumer,
void onStreamComplete();

MoQSession::PublisherImpl* publisher_{nullptr};
folly::Optional<folly::CancellationCallback> cancelCallback_;
proxygen::WebTransport::StreamWriteHandle* writeHandle_{nullptr};
StreamType streamType_;
ObjectHeader header_;
Expand All @@ -227,22 +229,8 @@ class StreamPublisherImpl : public SubgroupConsumer,

// StreamPublisherImpl

StreamPublisherImpl::StreamPublisherImpl(
MoQSession::PublisherImpl* publisher,
proxygen::WebTransport::StreamWriteHandle* writeHandle)
: CancellationCallback(
writeHandle->getCancelToken(),
[this] {
if (writeHandle_) {
auto code = writeHandle_->stopSendingErrorCode();
XLOG(DBG1) << "Peer requested write termination code="
<< (code ? folly::to<std::string>(*code)
: std::string("none"));
reset(ResetStreamErrorCode::CANCELLED);
}
}),
publisher_(publisher),
writeHandle_(writeHandle),
StreamPublisherImpl::StreamPublisherImpl(MoQSession::PublisherImpl* publisher)
: publisher_(publisher),
streamType_(StreamType::FETCH_HEADER),
header_{
publisher->subscribeID(),
Expand All @@ -261,19 +249,40 @@ StreamPublisherImpl::StreamPublisherImpl(
TrackAlias alias,
uint64_t groupID,
uint64_t subgroupID)
: StreamPublisherImpl(publisher, writeHandle) {
: StreamPublisherImpl(publisher) {
streamType_ = StreamType::STREAM_HEADER_SUBGROUP;
header_.trackIdentifier = alias;
setWriteHandle(writeHandle);
setGroupAndSubgroup(groupID, subgroupID);
writeBuf_.move(); // clear FETCH_HEADER
(void)writeSubgroupHeader(writeBuf_, header_);
}

// Private methods

void StreamPublisherImpl::setWriteHandle(
proxygen::WebTransport::StreamWriteHandle* writeHandle) {
XCHECK(publisher_);
XCHECK(!writeHandle_);
writeHandle_ = writeHandle;
cancelCallback_.emplace(writeHandle_->getCancelToken(), [this] {
if (writeHandle_) {
auto code = writeHandle_->stopSendingErrorCode();
XLOG(DBG1) << "Peer requested write termination code="
<< (code ? folly::to<std::string>(*code)
: std::string("none"));
reset(ResetStreamErrorCode::CANCELLED);
}
});
}

void StreamPublisherImpl::onStreamComplete() {
XCHECK_EQ(writeHandle_, nullptr);
publisher_->onStreamComplete(header_);
auto publisher = publisher_;
publisher_ = nullptr;
if (publisher) {
publisher->onStreamComplete(header_);
}
}

folly::Expected<folly::Unit, MoQPublishError>
Expand All @@ -292,12 +301,7 @@ StreamPublisherImpl::validatePublish(uint64_t objectID) {
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::API_ERROR, "Object ID not advancing in subgroup"));
}
if (!writeHandle_) {
XLOG(ERR) << "Write after subgroup complete sgp=" << this;
return folly::makeUnexpected(
MoQPublishError(MoQPublishError::API_ERROR, "Subgroup reset"));
}
return folly::unit;
return ensureWriteHandle();
}

folly::Expected<folly::Unit, MoQPublishError>
Expand Down Expand Up @@ -469,7 +473,7 @@ void StreamPublisherImpl::reset(ResetStreamErrorCode error) {
writeHandle_ = nullptr;
writeHandle->resetStream(uint32_t(error));
} else {
// Can happen on STOP_SENDING
// Can happen on STOP_SENDING or prior to first fetch write
XLOG(ERR) << "reset with no write handle: sgp=" << this;
}
onStreamComplete();
Expand All @@ -489,6 +493,44 @@ StreamPublisherImpl::awaitReadyToConsume() {
return std::move(writableFuture.value());
}

folly::Expected<folly::Unit, MoQPublishError>
StreamPublisherImpl::ensureWriteHandle() {
if (writeHandle_) {
return folly::unit;
}
if (!publisher_) {
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::API_ERROR, "Write after stream complete"));
}
// This has to be FETCH, subscribe is created with a writeHandle_ and
// publisher_ is cleared when the stream FIN's or resets.
auto wt = publisher_->getWebTransport();
if (!wt) {
XLOG(ERR) << "Trying to publish after fetchCancel";
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::API_ERROR, "Publish after fetchCancel"));
}

auto stream = wt->createUniStream();
if (!stream) {
// failed to create a stream
XLOG(ERR) << "Failed to create uni stream tp=" << this;
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::BLOCKED, "Failed to create uni stream."));
}
XLOG(DBG4) << "New stream created, id: " << stream.value()->getID()
<< " tp=" << this;
// publisher group order is not known here, but it shouldn't matter
// Currently sets group=0 for FETCH priority bits
stream.value()->setPriority(
1,
getStreamPriority(
0, 0, publisher_->subPriority(), 0, GroupOrder::OldestFirst),
false);
setWriteHandle(*stream);
return folly::unit;
}

} // namespace

namespace moxygen {
Expand Down Expand Up @@ -568,10 +610,21 @@ class MoQSession::FetchPublisherImpl : public MoQSession::PublisherImpl {
SubscribeID subscribeID,
Priority subPriority,
GroupOrder groupOrder)
: PublisherImpl(session, subscribeID, subPriority, groupOrder) {}
: PublisherImpl(session, subscribeID, subPriority, groupOrder) {
streamPublisher_ = std::make_shared<StreamPublisherImpl>(this);
}

folly::Expected<std::shared_ptr<FetchConsumer>, MoQPublishError> beginFetch(
GroupOrder groupOrder);
std::shared_ptr<StreamPublisherImpl> getStreamPublisher() const {
return streamPublisher_;
}

void setFetchHandle(std::shared_ptr<Publisher::FetchHandle> handle) {
handle_ = std::move(handle);
}

std::shared_ptr<Publisher::FetchHandle> getFetchHandle() const {
return handle_;
}

void reset(ResetStreamErrorCode error) override {
if (streamPublisher_) {
Expand All @@ -585,6 +638,7 @@ class MoQSession::FetchPublisherImpl : public MoQSession::PublisherImpl {
}

private:
std::shared_ptr<Publisher::FetchHandle> handle_;
std::shared_ptr<StreamPublisherImpl> streamPublisher_;
};

Expand Down Expand Up @@ -735,34 +789,6 @@ MoQSession::TrackPublisherImpl::subscribeDone(SubscribeDone subDone) {
return PublisherImpl::subscribeDone(std::move(subDone));
}

// FetchPublisherImpl

folly::Expected<std::shared_ptr<FetchConsumer>, MoQPublishError>
MoQSession::FetchPublisherImpl::beginFetch(GroupOrder groupOrder) {
auto wt = getWebTransport();
if (!wt) {
XLOG(ERR) << "Trying to publish after fetchCancel";
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::API_ERROR, "Publish after fetchCancel"));
}

auto stream = wt->createUniStream();
if (!stream) {
// failed to create a stream
XLOG(ERR) << "Failed to create uni stream tp=" << this;
return folly::makeUnexpected(MoQPublishError(
MoQPublishError::BLOCKED, "Failed to create uni stream."));
}
XLOG(DBG4) << "New stream created, id: " << stream.value()->getID()
<< " tp=" << this;
setGroupOrder(groupOrder);
// Currently sets group=0 for FETCH priority bits
stream.value()->setPriority(
1, getStreamPriority(0, 0, subPriority_, 0, groupOrder_), false);
streamPublisher_ = std::make_shared<StreamPublisherImpl>(this, *stream);
return streamPublisher_;
}

// Receive State
class MoQSession::TrackReceiveStateBase {
public:
Expand Down Expand Up @@ -1778,8 +1804,43 @@ void MoQSession::onFetch(Fetch fetch) {
}
auto fetchPublisher = std::make_shared<FetchPublisherImpl>(
this, fetch.subscribeID, fetch.priority, fetch.groupOrder);
pubTracks_.emplace(fetch.subscribeID, std::move(fetchPublisher));
controlMessages_.enqueue(std::move(fetch));
pubTracks_.emplace(fetch.subscribeID, fetchPublisher);
handleFetch(std::move(fetch), std::move(fetchPublisher))
.scheduleOn(evb_)
.start();
}

folly::coro::Task<void> MoQSession::handleFetch(
Fetch fetch,
std::shared_ptr<FetchPublisherImpl> fetchPublisher) {
folly::RequestContextScopeGuard guard;
setRequestSession();
auto subscribeID = fetch.subscribeID;
auto fetchResult = co_await co_awaitTry(co_withCancellation(
cancellationSource_.getToken(),
publishHandler_->fetch(
std::move(fetch), fetchPublisher->getStreamPublisher())));
if (fetchResult.hasException()) {
XLOG(ERR) << "Exception in Publisher callback ex="
<< fetchResult.exception().what();
fetchError(
{subscribeID, 500, fetchResult.exception().what().toStdString()});
co_return;
}
if (fetchResult->hasError()) {
XLOG(DBG1) << "Application fetch error err="
<< fetchResult->error().reasonPhrase;
auto fetchErr = std::move(fetchResult->error());
fetchErr.subscribeID = subscribeID; // In case app got it wrong
fetchError(std::move(fetchErr));
} else {
// What happens if this got cancelled
auto fetchHandle = std::move(fetchResult->value());
auto fetchOkMsg = fetchHandle->fetchOk();
fetchOkMsg.subscribeID = subscribeID;
fetchOk(std::move(fetchOkMsg));
fetchPublisher->setFetchHandle(std::move(fetchHandle));
}
}

void MoQSession::onFetchCancel(FetchCancel fetchCancel) {
Expand All @@ -1795,7 +1856,16 @@ void MoQSession::onFetchCancel(FetchCancel fetchCancel) {
} else {
// It's possible the fetch stream hasn't opened yet if the application
// hasn't made it to fetchOK.
pubTrackIt->second->reset(ResetStreamErrorCode::CANCELLED);
auto fetchPublisher =
std::dynamic_pointer_cast<FetchPublisherImpl>(pubTrackIt->second);
if (!fetchPublisher) {
XLOG(ERR) << "FETCH_CANCEL on SUBSCRIBE id=" << fetchCancel.subscribeID;
return;
}
fetchPublisher->reset(ResetStreamErrorCode::CANCELLED);
if (fetchPublisher->getFetchHandle()) {
fetchPublisher->getFetchHandle()->fetchCancel();
}
retireSubscribeId(/*signalWriteLoop=*/true);
}
}
Expand Down Expand Up @@ -2461,40 +2531,14 @@ folly::coro::Task<Publisher::FetchResult> MoQSession::fetch(
}
}

std::shared_ptr<FetchConsumer> MoQSession::fetchOk(FetchOk fetchOk) {
void MoQSession::fetchOk(FetchOk fetchOk) {
XLOG(DBG1) << __func__ << " sess=" << this;
auto it = pubTracks_.find(fetchOk.subscribeID);
if (it == pubTracks_.end()) {
XLOG(ERR) << "Invalid Fetch OK, id=" << fetchOk.subscribeID;
return nullptr;
}
auto fetchPublisher = dynamic_cast<FetchPublisherImpl*>(it->second.get());
if (!fetchPublisher) {
XLOG(ERR) << "subscribe ID maps to a subscribe, not a fetch, id="
<< fetchOk.subscribeID;
fetchError(
{fetchOk.subscribeID,
folly::to_underlying(FetchErrorCode::INTERNAL_ERROR),
""});
return nullptr;
}
auto fetchConsumer = fetchPublisher->beginFetch(fetchOk.groupOrder);
if (!fetchConsumer) {
XLOG(ERR) << "beginFetch Failed, id=" << fetchOk.subscribeID;
fetchError(
{fetchOk.subscribeID,
folly::to_underlying(FetchErrorCode::INTERNAL_ERROR),
""});
return nullptr;
}

auto res = writeFetchOk(controlWriteBuf_, fetchOk);
if (!res) {
XLOG(ERR) << "writeFetchOk failed sess=" << this;
return nullptr;
return;
}
controlWriteEvent_.signal();
return *fetchConsumer;
}

void MoQSession::fetchError(FetchError fetchErr) {
Expand Down
Loading

0 comments on commit 5597175

Please sign in to comment.