Skip to content

Commit

Permalink
Implement announce using Subscriber interface (facebookexperimental#17)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookexperimental#17

By removing the last use of the control message queue, delete all control loops and control visitors

Differential Revision: D68139012
  • Loading branch information
afrind authored and facebook-github-bot committed Jan 22, 2025
1 parent 4259a21 commit 7b9d0ca
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 434 deletions.
28 changes: 7 additions & 21 deletions moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,17 @@ folly::Try<ServerSetup> MoQServer::onClientSetup(ClientSetup /*setup*/) {
}));
}

// TODO: Implement message handling
void MoQServer::ControlVisitor::operator()(Announce announce) const {
XLOG(INFO) << "Announce ns=" << announce.trackNamespace;
clientSession_->announceError(
{announce.trackNamespace, 500, "not implemented"});
}

void MoQServer::ControlVisitor::operator()(Unannounce unannounce) const {
XLOG(INFO) << "Unannounce ns=" << unannounce.trackNamespace;
}

void MoQServer::ControlVisitor::operator()(
AnnounceCancel announceCancel) const {
XLOG(INFO) << "AnnounceCancel ns=" << announceCancel.trackNamespace;
}

folly::coro::Task<void> MoQServer::handleClientSession(
std::shared_ptr<MoQSession> clientSession) {
onNewSession(clientSession);
clientSession->start();
co_await clientSession->clientSetupComplete();

auto control = makeControlVisitor(clientSession);
while (auto msg = co_await clientSession->controlMessages().next()) {
boost::apply_visitor(*control, msg.value());
}
// The clientSession will cancel this token when the app calls close() or
// the underlying transport invokes onSessionEnd
folly::coro::Baton baton;
folly::CancellationCallback cb(
clientSession->getCancelToken(), [&baton] { baton.post(); });
co_await baton;
terminateClientSession(std::move(clientSession));
}

Expand Down
27 changes: 4 additions & 23 deletions moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,14 @@ class MoQServer : public MoQSession::ServerSetupCallback {
MoQServer& operator=(MoQServer&&) = delete;
virtual ~MoQServer() = default;

class ControlVisitor : public MoQSession::ControlVisitor {
public:
explicit ControlVisitor(std::shared_ptr<MoQSession> clientSession)
: clientSession_(std::move(clientSession)) {}

~ControlVisitor() override = default;

void operator()(Announce announce) const override;
void operator()(Unannounce unannounce) const override;
void operator()(AnnounceCancel announceCancel) const override;

protected:
std::shared_ptr<MoQSession> clientSession_;
};

virtual std::unique_ptr<ControlVisitor> makeControlVisitor(
std::shared_ptr<MoQSession> clientSession) {
return std::make_unique<ControlVisitor>(std::move(clientSession));
virtual void onNewSession(std::shared_ptr<MoQSession> clientSession) = 0;
virtual void terminateClientSession(std::shared_ptr<MoQSession> /*session*/) {
}

virtual folly::coro::Task<void> handleClientSession(
private:
folly::coro::Task<void> handleClientSession(
std::shared_ptr<MoQSession> clientSession);

virtual void terminateClientSession(std::shared_ptr<MoQSession> /*session*/) {
}

class Handler : public proxygen::HTTPTransactionHandler {
public:
explicit Handler(MoQServer& server) : server_(server) {}
Expand Down Expand Up @@ -115,7 +97,6 @@ class MoQServer : public MoQSession::ServerSetupCallback {

folly::Try<ServerSetup> onClientSetup(ClientSetup clientSetup) override;

private:
void createMoQQuicSession(std::shared_ptr<quic::QuicSocket> quicSocket);

quic::samples::HQServerParams params_;
Expand Down
160 changes: 132 additions & 28 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,44 @@ class MoQSession::FetchTrackReceiveState
using folly::coro::co_awaitTry;
using folly::coro::co_error;

class MoQSession::SubscriberAnnounceCallback
: public Subscriber::AnnounceCallback {
public:
SubscriberAnnounceCallback(MoQSession& session, TrackNamespace ns)
: session_(session), trackNamespace_(ns) {}

void announceCancel(uint64_t errorCode, std::string reasonPhrase) override {
session_.announceCancel(
{trackNamespace_, errorCode, std::move(reasonPhrase)});
}

private:
MoQSession& session_;
TrackNamespace trackNamespace_;
};

class MoQSession::PublisherAnnounceHandle : public Subscriber::AnnounceHandle {
public:
PublisherAnnounceHandle(std::shared_ptr<MoQSession> session, AnnounceOk annOk)
: Subscriber::AnnounceHandle(std::move(annOk)),
session_(std::move(session)) {}
PublisherAnnounceHandle(const PublisherAnnounceHandle&) = delete;
PublisherAnnounceHandle& operator=(const PublisherAnnounceHandle&) = delete;
~PublisherAnnounceHandle() override {
unannounce();
}

void unannounce() override {
if (session_) {
session_->unannounce({announceOk().trackNamespace});
session_.reset();
}
}

private:
std::shared_ptr<MoQSession> session_;
};

MoQSession::~MoQSession() {
cleanup();
XLOG(DBG1) << __func__ << " sess=" << this;
Expand All @@ -944,6 +982,17 @@ void MoQSession::cleanup() {
subAnn.second->unsubscribeAnnounces();
}
subscribeAnnounces_.clear();
for (auto& ann : subscriberAnnounces_) {
ann.second->unannounce();
}
subscriberAnnounces_.clear();
for (auto& ann : publisherAnnounces_) {
if (ann.second) {
ann.second->announceCancel(
std::numeric_limits<uint64_t>::max(), "Session ended");
}
}
publisherAnnounces_.clear();
for (auto& pubTrack : pubTracks_) {
pubTrack.second->reset(ResetStreamErrorCode::SESSION_CLOSED);
}
Expand Down Expand Up @@ -1088,7 +1137,8 @@ folly::coro::Task<void> MoQSession::controlWriteLoop(
folly::coro::Task<ServerSetup> MoQSession::setup(ClientSetup setup) {
XCHECK(dir_ == MoQControlCodec::Direction::CLIENT);
XLOG(DBG1) << __func__ << " sess=" << this;
std::tie(setupPromise_, setupFuture_) =
folly::coro::Future<ServerSetup> setupFuture;
std::tie(setupPromise_, setupFuture) =
folly::coro::makePromiseContract<ServerSetup>();

auto maxSubscribeId = getMaxSubscribeIdIfPresent(setup.params);
Expand All @@ -1106,7 +1156,7 @@ folly::coro::Task<ServerSetup> MoQSession::setup(ClientSetup setup) {
folly::EventBaseThreadTimekeeper tk(*evb_);
auto serverSetup = co_await co_awaitTry(folly::coro::co_withCancellation(
mergeToken,
folly::coro::timeout(std::move(setupFuture_), kSetupTimeout, &tk)));
folly::coro::timeout(std::move(setupFuture), kSetupTimeout, &tk)));
if (mergeToken.isCancellationRequested()) {
co_yield folly::coro::co_error(folly::OperationCancelled());
}
Expand Down Expand Up @@ -1165,29 +1215,9 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
}
maxSubscribeID_ = maxConcurrentSubscribes_ = maxSubscribeId;
setupComplete_ = true;
setupPromise_.setValue(ServerSetup());
controlWriteEvent_.signal();
}

folly::coro::AsyncGenerator<MoQSession::MoQMessage>
MoQSession::controlMessages() {
XLOG(DBG1) << __func__ << " sess=" << this;
auto self = shared_from_this();
while (true) {
auto token = cancellationSource_.getToken();
auto message = co_await folly::coro::co_awaitTry(
folly::coro::co_withCancellation(token, controlMessages_.dequeue()));
if (token.isCancellationRequested()) {
co_return;
}
if (message.hasException()) {
XLOG(ERR) << folly::exceptionStr(message.exception()) << " sess=" << this;
break;
}
co_yield *message;
}
}

folly::coro::Task<void> MoQSession::controlReadLoop(
proxygen::WebTransport::StreamReadHandle* readHandle) {
XLOG(DBG1) << __func__ << " sess=" << this;
Expand Down Expand Up @@ -1915,7 +1945,45 @@ void MoQSession::onFetchError(FetchError fetchError) {

void MoQSession::onAnnounce(Announce ann) {
XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this;
controlMessages_.enqueue(std::move(ann));
if (!subscribeHandler_) {
XLOG(DBG1) << __func__ << "No subscriber callback set";
announceError({ann.trackNamespace, 500, "Not a subscriber"});
} else {
handleAnnounce(std::move(ann)).scheduleOn(evb_).start();
}
}

folly::coro::Task<void> MoQSession::handleAnnounce(Announce announce) {
folly::RequestContextScopeGuard guard;
setRequestSession();
auto annCb = std::make_shared<SubscriberAnnounceCallback>(
*this, announce.trackNamespace);
auto announceResult = co_await co_awaitTry(co_withCancellation(
cancellationSource_.getToken(),
subscribeHandler_->announce(announce, std::move(annCb))));
if (announceResult.hasException()) {
XLOG(ERR) << "Exception in Subscriber callback ex="
<< announceResult.exception().what().toStdString();
announceError(
{announce.trackNamespace,
500,
announceResult.exception().what().toStdString()});
co_return;
}
if (announceResult->hasError()) {
XLOG(DBG1) << "Application announce error err="
<< announceResult->error().reasonPhrase;
auto annErr = std::move(announceResult->error());
annErr.trackNamespace = announce.trackNamespace; // In case app got it wrong
announceError(std::move(annErr));
} else {
auto handle = std::move(announceResult->value());
auto announceOkMsg = handle->announceOk();
announceOkMsg.trackNamespace = announce.trackNamespace;
announceOk(std::move(announceOkMsg));
// TODO: what about UNANNOUNCE before ANNOUNCE_OK
subscriberAnnounces_[announce.trackNamespace] = std::move(handle);
}
}

void MoQSession::onAnnounceOk(AnnounceOk annOk) {
Expand All @@ -1941,19 +2009,42 @@ void MoQSession::onAnnounceError(AnnounceError announceError) {
<< announceError.trackNamespace << " sess=" << this;
return;
}
publisherAnnounces_.erase(announceError.trackNamespace);
annIt->second.setValue(folly::makeUnexpected(std::move(announceError)));
pendingAnnounce_.erase(annIt);
}

void MoQSession::onUnannounce(Unannounce unAnn) {
XLOG(DBG1) << __func__ << " ns=" << unAnn.trackNamespace << " sess=" << this;
controlMessages_.enqueue(std::move(unAnn));
auto annIt = subscriberAnnounces_.find(unAnn.trackNamespace);
if (annIt == subscriberAnnounces_.end()) {
XLOG(ERR) << "Unannounce for bad namespace ns=" << unAnn.trackNamespace;
} else {
annIt->second->unannounce();
subscriberAnnounces_.erase(annIt);
}
}

void MoQSession::announceCancel(AnnounceCancel annCan) {
auto res = writeAnnounceCancel(controlWriteBuf_, annCan);
if (!res) {
XLOG(ERR) << "writeAnnounceCancel failed sess=" << this;
}
controlWriteEvent_.signal();
subscriberAnnounces_.erase(annCan.trackNamespace);
}

void MoQSession::onAnnounceCancel(AnnounceCancel announceCancel) {
XLOG(DBG1) << __func__ << " ns=" << announceCancel.trackNamespace
<< " sess=" << this;
controlMessages_.enqueue(std::move(announceCancel));
auto it = publisherAnnounces_.find(announceCancel.trackNamespace);
if (it == publisherAnnounces_.end()) {
XLOG(ERR) << "Invalid announce cancel ns=" << announceCancel.trackNamespace;
} else {
it->second->announceCancel(
announceCancel.errorCode, std::move(announceCancel.reasonPhrase));
publisherAnnounces_.erase(it);
}
}

void MoQSession::onSubscribeAnnounces(SubscribeAnnounces sa) {
Expand Down Expand Up @@ -2126,8 +2217,9 @@ void MoQSession::onConnectionError(ErrorCode error) {
close(SessionCloseErrorCode::PROTOCOL_VIOLATION);
}

folly::coro::Task<folly::Expected<AnnounceOk, AnnounceError>>
MoQSession::announce(Announce ann) {
folly::coro::Task<Subscriber::AnnounceResult> MoQSession::announce(
Announce ann,
std::shared_ptr<AnnounceCallback> announceCallback) {
XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this;
auto trackNamespace = ann.trackNamespace;
auto res = writeAnnounce(controlWriteBuf_, std::move(ann));
Expand All @@ -2139,9 +2231,16 @@ MoQSession::announce(Announce ann) {
controlWriteEvent_.signal();
auto contract = folly::coro::makePromiseContract<
folly::Expected<AnnounceOk, AnnounceError>>();
publisherAnnounces_[trackNamespace] = std::move(announceCallback);
pendingAnnounce_.emplace(
std::move(trackNamespace), std::move(contract.first));
co_return co_await std::move(contract.second);
auto announceResult = co_await std::move(contract.second);
if (announceResult.hasError()) {
co_return folly::makeUnexpected(announceResult.error());
} else {
co_return std::make_shared<PublisherAnnounceHandle>(
shared_from_this(), std::move(announceResult.value()));
}
}

void MoQSession::announceOk(AnnounceOk annOk) {
Expand All @@ -2167,6 +2266,11 @@ void MoQSession::announceError(AnnounceError announceError) {

void MoQSession::unannounce(Unannounce unann) {
XLOG(DBG1) << __func__ << " ns=" << unann.trackNamespace << " sess=" << this;
auto it = publisherAnnounces_.find(unann.trackNamespace);
if (it == publisherAnnounces_.end()) {
XLOG(ERR) << "Unannounce (cancelled?) ns=" << unann.trackNamespace;
return;
}
auto trackNamespace = unann.trackNamespace;
auto res = writeUnannounce(controlWriteBuf_, std::move(unann));
if (!res) {
Expand Down
Loading

0 comments on commit 7b9d0ca

Please sign in to comment.