From 7b9d0ca0bf4d9e55a0a4670dd3d60478bdec6e78 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Wed, 22 Jan 2025 14:42:22 -0800 Subject: [PATCH] Implement announce using Subscriber interface (#17) Summary: Pull Request resolved: https://github.com/facebookexperimental/moxygen/pull/17 By removing the last use of the control message queue, delete all control loops and control visitors Differential Revision: D68139012 --- moxygen/MoQServer.cpp | 28 +-- moxygen/MoQServer.h | 27 +-- moxygen/MoQSession.cpp | 160 +++++++++++++++--- moxygen/MoQSession.h | 75 +++----- moxygen/relay/MoQRelay.cpp | 112 ++++++------ moxygen/relay/MoQRelay.h | 38 ++++- moxygen/relay/MoQRelayClient.h | 27 +-- moxygen/relay/MoQRelayServer.cpp | 28 +-- moxygen/samples/chat/MoQChatClient.cpp | 67 ++++---- moxygen/samples/chat/MoQChatClient.h | 24 ++- moxygen/samples/date/MoQDateServer.cpp | 23 +-- .../MoQFlvReceiverClient.cpp | 41 ++--- .../MoQFlvStreamerClient.cpp | 42 +---- moxygen/samples/text-client/MoQTextClient.cpp | 39 ++--- moxygen/test/MoQSessionTest.cpp | 48 ------ 15 files changed, 345 insertions(+), 434 deletions(-) diff --git a/moxygen/MoQServer.cpp b/moxygen/MoQServer.cpp index 2ef3ad5b..02d62859 100644 --- a/moxygen/MoQServer.cpp +++ b/moxygen/MoQServer.cpp @@ -72,31 +72,17 @@ folly::Try MoQServer::onClientSetup(ClientSetup /*setup*/) { })); } -// TODO: Implement message handling -void MoQServer::ControlVisitor::operator()(Announce announce) const { - XLOG(INFO) << "Announce ns=" << announce.trackNamespace; - clientSession_->announceError( - {announce.trackNamespace, 500, "not implemented"}); -} - -void MoQServer::ControlVisitor::operator()(Unannounce unannounce) const { - XLOG(INFO) << "Unannounce ns=" << unannounce.trackNamespace; -} - -void MoQServer::ControlVisitor::operator()( - AnnounceCancel announceCancel) const { - XLOG(INFO) << "AnnounceCancel ns=" << announceCancel.trackNamespace; -} - folly::coro::Task MoQServer::handleClientSession( std::shared_ptr clientSession) { + onNewSession(clientSession); clientSession->start(); - co_await clientSession->clientSetupComplete(); - auto control = makeControlVisitor(clientSession); - while (auto msg = co_await clientSession->controlMessages().next()) { - boost::apply_visitor(*control, msg.value()); - } + // The clientSession will cancel this token when the app calls close() or + // the underlying transport invokes onSessionEnd + folly::coro::Baton baton; + folly::CancellationCallback cb( + clientSession->getCancelToken(), [&baton] { baton.post(); }); + co_await baton; terminateClientSession(std::move(clientSession)); } diff --git a/moxygen/MoQServer.h b/moxygen/MoQServer.h index 02bb3657..7a443cdf 100644 --- a/moxygen/MoQServer.h +++ b/moxygen/MoQServer.h @@ -28,32 +28,14 @@ class MoQServer : public MoQSession::ServerSetupCallback { MoQServer& operator=(MoQServer&&) = delete; virtual ~MoQServer() = default; - class ControlVisitor : public MoQSession::ControlVisitor { - public: - explicit ControlVisitor(std::shared_ptr clientSession) - : clientSession_(std::move(clientSession)) {} - - ~ControlVisitor() override = default; - - void operator()(Announce announce) const override; - void operator()(Unannounce unannounce) const override; - void operator()(AnnounceCancel announceCancel) const override; - - protected: - std::shared_ptr clientSession_; - }; - - virtual std::unique_ptr makeControlVisitor( - std::shared_ptr clientSession) { - return std::make_unique(std::move(clientSession)); + virtual void onNewSession(std::shared_ptr clientSession) = 0; + virtual void terminateClientSession(std::shared_ptr /*session*/) { } - virtual folly::coro::Task handleClientSession( + private: + folly::coro::Task handleClientSession( std::shared_ptr clientSession); - virtual void terminateClientSession(std::shared_ptr /*session*/) { - } - class Handler : public proxygen::HTTPTransactionHandler { public: explicit Handler(MoQServer& server) : server_(server) {} @@ -115,7 +97,6 @@ class MoQServer : public MoQSession::ServerSetupCallback { folly::Try onClientSetup(ClientSetup clientSetup) override; - private: void createMoQQuicSession(std::shared_ptr quicSocket); quic::samples::HQServerParams params_; diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 546b9d19..6dabcd8a 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -933,6 +933,44 @@ class MoQSession::FetchTrackReceiveState using folly::coro::co_awaitTry; using folly::coro::co_error; +class MoQSession::SubscriberAnnounceCallback + : public Subscriber::AnnounceCallback { + public: + SubscriberAnnounceCallback(MoQSession& session, TrackNamespace ns) + : session_(session), trackNamespace_(ns) {} + + void announceCancel(uint64_t errorCode, std::string reasonPhrase) override { + session_.announceCancel( + {trackNamespace_, errorCode, std::move(reasonPhrase)}); + } + + private: + MoQSession& session_; + TrackNamespace trackNamespace_; +}; + +class MoQSession::PublisherAnnounceHandle : public Subscriber::AnnounceHandle { + public: + PublisherAnnounceHandle(std::shared_ptr session, AnnounceOk annOk) + : Subscriber::AnnounceHandle(std::move(annOk)), + session_(std::move(session)) {} + PublisherAnnounceHandle(const PublisherAnnounceHandle&) = delete; + PublisherAnnounceHandle& operator=(const PublisherAnnounceHandle&) = delete; + ~PublisherAnnounceHandle() override { + unannounce(); + } + + void unannounce() override { + if (session_) { + session_->unannounce({announceOk().trackNamespace}); + session_.reset(); + } + } + + private: + std::shared_ptr session_; +}; + MoQSession::~MoQSession() { cleanup(); XLOG(DBG1) << __func__ << " sess=" << this; @@ -944,6 +982,17 @@ void MoQSession::cleanup() { subAnn.second->unsubscribeAnnounces(); } subscribeAnnounces_.clear(); + for (auto& ann : subscriberAnnounces_) { + ann.second->unannounce(); + } + subscriberAnnounces_.clear(); + for (auto& ann : publisherAnnounces_) { + if (ann.second) { + ann.second->announceCancel( + std::numeric_limits::max(), "Session ended"); + } + } + publisherAnnounces_.clear(); for (auto& pubTrack : pubTracks_) { pubTrack.second->reset(ResetStreamErrorCode::SESSION_CLOSED); } @@ -1088,7 +1137,8 @@ folly::coro::Task MoQSession::controlWriteLoop( folly::coro::Task MoQSession::setup(ClientSetup setup) { XCHECK(dir_ == MoQControlCodec::Direction::CLIENT); XLOG(DBG1) << __func__ << " sess=" << this; - std::tie(setupPromise_, setupFuture_) = + folly::coro::Future setupFuture; + std::tie(setupPromise_, setupFuture) = folly::coro::makePromiseContract(); auto maxSubscribeId = getMaxSubscribeIdIfPresent(setup.params); @@ -1106,7 +1156,7 @@ folly::coro::Task MoQSession::setup(ClientSetup setup) { folly::EventBaseThreadTimekeeper tk(*evb_); auto serverSetup = co_await co_awaitTry(folly::coro::co_withCancellation( mergeToken, - folly::coro::timeout(std::move(setupFuture_), kSetupTimeout, &tk))); + folly::coro::timeout(std::move(setupFuture), kSetupTimeout, &tk))); if (mergeToken.isCancellationRequested()) { co_yield folly::coro::co_error(folly::OperationCancelled()); } @@ -1165,29 +1215,9 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) { } maxSubscribeID_ = maxConcurrentSubscribes_ = maxSubscribeId; setupComplete_ = true; - setupPromise_.setValue(ServerSetup()); controlWriteEvent_.signal(); } -folly::coro::AsyncGenerator -MoQSession::controlMessages() { - XLOG(DBG1) << __func__ << " sess=" << this; - auto self = shared_from_this(); - while (true) { - auto token = cancellationSource_.getToken(); - auto message = co_await folly::coro::co_awaitTry( - folly::coro::co_withCancellation(token, controlMessages_.dequeue())); - if (token.isCancellationRequested()) { - co_return; - } - if (message.hasException()) { - XLOG(ERR) << folly::exceptionStr(message.exception()) << " sess=" << this; - break; - } - co_yield *message; - } -} - folly::coro::Task MoQSession::controlReadLoop( proxygen::WebTransport::StreamReadHandle* readHandle) { XLOG(DBG1) << __func__ << " sess=" << this; @@ -1915,7 +1945,45 @@ void MoQSession::onFetchError(FetchError fetchError) { void MoQSession::onAnnounce(Announce ann) { XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this; - controlMessages_.enqueue(std::move(ann)); + if (!subscribeHandler_) { + XLOG(DBG1) << __func__ << "No subscriber callback set"; + announceError({ann.trackNamespace, 500, "Not a subscriber"}); + } else { + handleAnnounce(std::move(ann)).scheduleOn(evb_).start(); + } +} + +folly::coro::Task MoQSession::handleAnnounce(Announce announce) { + folly::RequestContextScopeGuard guard; + setRequestSession(); + auto annCb = std::make_shared( + *this, announce.trackNamespace); + auto announceResult = co_await co_awaitTry(co_withCancellation( + cancellationSource_.getToken(), + subscribeHandler_->announce(announce, std::move(annCb)))); + if (announceResult.hasException()) { + XLOG(ERR) << "Exception in Subscriber callback ex=" + << announceResult.exception().what().toStdString(); + announceError( + {announce.trackNamespace, + 500, + announceResult.exception().what().toStdString()}); + co_return; + } + if (announceResult->hasError()) { + XLOG(DBG1) << "Application announce error err=" + << announceResult->error().reasonPhrase; + auto annErr = std::move(announceResult->error()); + annErr.trackNamespace = announce.trackNamespace; // In case app got it wrong + announceError(std::move(annErr)); + } else { + auto handle = std::move(announceResult->value()); + auto announceOkMsg = handle->announceOk(); + announceOkMsg.trackNamespace = announce.trackNamespace; + announceOk(std::move(announceOkMsg)); + // TODO: what about UNANNOUNCE before ANNOUNCE_OK + subscriberAnnounces_[announce.trackNamespace] = std::move(handle); + } } void MoQSession::onAnnounceOk(AnnounceOk annOk) { @@ -1941,19 +2009,42 @@ void MoQSession::onAnnounceError(AnnounceError announceError) { << announceError.trackNamespace << " sess=" << this; return; } + publisherAnnounces_.erase(announceError.trackNamespace); annIt->second.setValue(folly::makeUnexpected(std::move(announceError))); pendingAnnounce_.erase(annIt); } void MoQSession::onUnannounce(Unannounce unAnn) { XLOG(DBG1) << __func__ << " ns=" << unAnn.trackNamespace << " sess=" << this; - controlMessages_.enqueue(std::move(unAnn)); + auto annIt = subscriberAnnounces_.find(unAnn.trackNamespace); + if (annIt == subscriberAnnounces_.end()) { + XLOG(ERR) << "Unannounce for bad namespace ns=" << unAnn.trackNamespace; + } else { + annIt->second->unannounce(); + subscriberAnnounces_.erase(annIt); + } +} + +void MoQSession::announceCancel(AnnounceCancel annCan) { + auto res = writeAnnounceCancel(controlWriteBuf_, annCan); + if (!res) { + XLOG(ERR) << "writeAnnounceCancel failed sess=" << this; + } + controlWriteEvent_.signal(); + subscriberAnnounces_.erase(annCan.trackNamespace); } void MoQSession::onAnnounceCancel(AnnounceCancel announceCancel) { XLOG(DBG1) << __func__ << " ns=" << announceCancel.trackNamespace << " sess=" << this; - controlMessages_.enqueue(std::move(announceCancel)); + auto it = publisherAnnounces_.find(announceCancel.trackNamespace); + if (it == publisherAnnounces_.end()) { + XLOG(ERR) << "Invalid announce cancel ns=" << announceCancel.trackNamespace; + } else { + it->second->announceCancel( + announceCancel.errorCode, std::move(announceCancel.reasonPhrase)); + publisherAnnounces_.erase(it); + } } void MoQSession::onSubscribeAnnounces(SubscribeAnnounces sa) { @@ -2126,8 +2217,9 @@ void MoQSession::onConnectionError(ErrorCode error) { close(SessionCloseErrorCode::PROTOCOL_VIOLATION); } -folly::coro::Task> -MoQSession::announce(Announce ann) { +folly::coro::Task MoQSession::announce( + Announce ann, + std::shared_ptr announceCallback) { XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this; auto trackNamespace = ann.trackNamespace; auto res = writeAnnounce(controlWriteBuf_, std::move(ann)); @@ -2139,9 +2231,16 @@ MoQSession::announce(Announce ann) { controlWriteEvent_.signal(); auto contract = folly::coro::makePromiseContract< folly::Expected>(); + publisherAnnounces_[trackNamespace] = std::move(announceCallback); pendingAnnounce_.emplace( std::move(trackNamespace), std::move(contract.first)); - co_return co_await std::move(contract.second); + auto announceResult = co_await std::move(contract.second); + if (announceResult.hasError()) { + co_return folly::makeUnexpected(announceResult.error()); + } else { + co_return std::make_shared( + shared_from_this(), std::move(announceResult.value())); + } } void MoQSession::announceOk(AnnounceOk annOk) { @@ -2167,6 +2266,11 @@ void MoQSession::announceError(AnnounceError announceError) { void MoQSession::unannounce(Unannounce unann) { XLOG(DBG1) << __func__ << " ns=" << unann.trackNamespace << " sess=" << this; + auto it = publisherAnnounces_.find(unann.trackNamespace); + if (it == publisherAnnounces_.end()) { + XLOG(ERR) << "Unannounce (cancelled?) ns=" << unann.trackNamespace; + return; + } auto trackNamespace = unann.trackNamespace; auto res = writeUnannounce(controlWriteBuf_, std::move(unann)); if (!res) { diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index a57af811..e65a8c5e 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -28,6 +28,7 @@ namespace moxygen { class MoQSession : public MoQControlCodec::ControlCallback, public proxygen::WebTransportHandler, public Publisher, + public Subscriber, public std::enable_shared_from_this { public: struct MoQSessionRequestData : public folly::RequestData { @@ -67,11 +68,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, : dir_(MoQControlCodec::Direction::SERVER), wt_(wt), evb_(evb), - serverSetupCallback_(&serverSetupCallback) { - // SERVER sessions use this promise/future as a signal - std::tie(setupPromise_, setupFuture_) = - folly::coro::makePromiseContract(); - } + serverSetupCallback_(&serverSetupCallback) {} void setPublishHandler(std::shared_ptr publishHandler) { publishHandler_ = std::move(publishHandler); @@ -98,11 +95,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, void goaway(Goaway goaway) override; folly::coro::Task setup(ClientSetup setup); - folly::coro::Task clientSetupComplete() { - XCHECK(dir_ == MoQControlCodec::Direction::SERVER); - // TODO timeout - co_await std::move(setupFuture_); - } void setMaxConcurrentSubscribes(uint64_t maxConcurrent) { if (maxConcurrent > maxConcurrentSubscribes_) { @@ -112,45 +104,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, } } - using MoQMessage = boost::variant; - - class ControlVisitor : public boost::static_visitor<> { - public: - ControlVisitor() = default; - virtual ~ControlVisitor() = default; - virtual void operator()(ClientSetup /*setup*/) const { - XLOG(INFO) << "ClientSetup"; - } - - virtual void operator()(Announce announce) const { - XLOG(INFO) << "Announce ns=" << announce.trackNamespace; - } - - virtual void operator()(Unannounce unannounce) const { - XLOG(INFO) << "Unannounce ns=" << unannounce.trackNamespace; - } - - virtual void operator()(AnnounceCancel announceCancel) const { - XLOG(INFO) << "AnnounceCancel ns=" << announceCancel.trackNamespace; - } - - virtual void operator()(AnnounceError announceError) const { - XLOG(INFO) << "AnnounceError ns=" << announceError.trackNamespace - << " code=" << announceError.errorCode - << " reason=" << announceError.reasonPhrase; - } - - private: - }; - - folly::coro::AsyncGenerator controlMessages(); - - folly::coro::Task> announce( - Announce ann); - void announceOk(AnnounceOk annOk); - void announceError(AnnounceError announceError); - void unannounce(Unannounce unannounce); - uint64_t maxSubscribeID() const { return maxSubscribeID_; } @@ -174,6 +127,10 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::coro::Task subscribeAnnounces( SubscribeAnnounces subAnn) override; + folly::coro::Task announce( + Announce ann, + std::shared_ptr announceCallback = nullptr) override; + class PublisherImpl { public: PublisherImpl( @@ -290,6 +247,12 @@ class MoQSession : public MoQControlCodec::ControlCallback, void subscribeAnnouncesError(SubscribeAnnouncesError subscribeAnnouncesError); void unsubscribeAnnounces(UnsubscribeAnnounces unsubscribeAnnounces); + folly::coro::Task handleAnnounce(Announce announce); + void announceOk(AnnounceOk annOk); + void announceError(AnnounceError announceError); + void announceCancel(AnnounceCancel annCan); + void unannounce(Unannounce unannounce); + class ReceiverSubscriptionHandle; class ReceiverFetchHandle; @@ -344,7 +307,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::EventBase* evb_{nullptr}; // keepalive? folly::IOBufQueue controlWriteBuf_{folly::IOBufQueue::cacheChainLength()}; moxygen::TimedBaton controlWriteEvent_; - folly::coro::UnboundedQueue controlMessages_; // Track Alias -> Receive State folly::F14FastMap< @@ -380,7 +342,19 @@ class MoQSession : public MoQControlCodec::ControlCallback, F14FastMap, SubscribeID::hash> pubTracks_; + class SubscriberAnnounceCallback; + class PublisherAnnounceHandle; class SubscribeAnnouncesHandle; + folly::F14FastMap< + TrackNamespace, + std::shared_ptr, + TrackNamespace::hash> + subscriberAnnounces_; + folly::F14FastMap< + TrackNamespace, + std::shared_ptr, + TrackNamespace::hash> + publisherAnnounces_; folly::F14FastMap< TrackNamespace, std::shared_ptr, @@ -396,7 +370,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, uint64_t peerMaxSubscribeID_{0}; folly::coro::Promise setupPromise_; - folly::coro::Future setupFuture_; bool setupComplete_{false}; bool draining_{false}; bool receivedGoaway_{false}; diff --git a/moxygen/relay/MoQRelay.cpp b/moxygen/relay/MoQRelay.cpp index 146a132d..43e29647 100644 --- a/moxygen/relay/MoQRelay.cpp +++ b/moxygen/relay/MoQRelay.cpp @@ -8,11 +8,12 @@ namespace moxygen { -MoQRelay::AnnounceNode* MoQRelay::findNamespaceNode( +std::shared_ptr MoQRelay::findNamespaceNode( const TrackNamespace& ns, bool createMissingNodes, std::vector>* sessions) { - AnnounceNode* nodePtr = &announceRoot_; + std::shared_ptr nodePtr( + std::shared_ptr(), &announceRoot_); for (auto i = 0ul; i < ns.size(); i++) { if (sessions) { sessions->insert( @@ -22,66 +23,73 @@ MoQRelay::AnnounceNode* MoQRelay::findNamespaceNode( auto it = nodePtr->children.find(name); if (it == nodePtr->children.end()) { if (createMissingNodes) { - nodePtr = - &nodePtr->children.emplace(name, AnnounceNode()).first->second; + auto node = std::make_shared(*this); + nodePtr->children.emplace(name, node); + nodePtr = std::move(node); } else { XLOG(ERR) << "prefix not found in announce tree"; return nullptr; } } else { - nodePtr = &it->second; + nodePtr = it->second; } } return nodePtr; } -void MoQRelay::onAnnounce(Announce&& ann, std::shared_ptr session) { +folly::coro::Task MoQRelay::announce( + Announce ann, + std::shared_ptr) { XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace; // check auth if (!ann.trackNamespace.startsWith(allowedNamespacePrefix_)) { - session->announceError({ann.trackNamespace, 403, "bad namespace"}); - return; + co_return folly::makeUnexpected( + AnnounceError{ann.trackNamespace, 403, "bad namespace"}); } std::vector> sessions; auto nodePtr = findNamespaceNode( ann.trackNamespace, /*createMissingNodes=*/true, &sessions); // TODO: store auth for forwarding on future SubscribeAnnounces? + auto session = MoQSession::getRequestSession(); nodePtr->sourceSession = std::move(session); - nodePtr->sourceSession->announceOk({ann.trackNamespace}); + nodePtr->setAnnounceOk({ann.trackNamespace}); for (auto& outSession : sessions) { - auto evb = outSession->getEventBase(); - // We don't really care if we get announce error, I guess? - outSession->announce(ann).scheduleOn(evb).start(); + if (outSession != session) { + auto evb = outSession->getEventBase(); + announceToSession(outSession, ann, nodePtr).scheduleOn(evb).start(); + } } + co_return nodePtr; } -void MoQRelay::onUnannounce( - Unannounce&& unann, - const std::shared_ptr& session) { - XLOG(DBG1) << __func__ << " ns=" << unann.trackNamespace; - std::vector> sessions; - auto nodePtr = findNamespaceNode( - unann.trackNamespace, /*createMissingNodes=*/false, &sessions); - if (!nodePtr) { - // TODO: maybe error? - return; - } - if (nodePtr->sourceSession.get() == session.get()) { - nodePtr->sourceSession = nullptr; - for (auto& outSession : sessions) { - auto evb = outSession->getEventBase(); - evb->runInEventBaseThread( - [outSession, unann] { outSession->unannounce(unann); }); - } +folly::coro::Task MoQRelay::announceToSession( + std::shared_ptr session, + Announce ann, + std::shared_ptr nodePtr) { + auto announceHandle = co_await session->announce(ann); + if (announceHandle.hasError()) { + XLOG(ERR) << "Announce failed err=" << announceHandle.error().reasonPhrase; } else { - if (nodePtr->sourceSession) { - XLOG(ERR) << "unannounce namespace announced by another session"; - } else { - XLOG(DBG1) << "unannounce namespace that was not announced"; - } + // This can race with unsubscribeAnnounces + nodePtr->announcements[session] = std::move(announceHandle.value()); } +} +void MoQRelay::unannounce(const TrackNamespace& trackNamespace, AnnounceNode*) { + XLOG(DBG1) << __func__ << " ns=" << trackNamespace; + // Node would be useful if there were back links + auto nodePtr = + findNamespaceNode(trackNamespace, /*createMissingNodes=*/false, nullptr); + XCHECK(nodePtr); + nodePtr->sourceSession = nullptr; + for (auto& announcement : nodePtr->announcements) { + auto evb = announcement.first->getEventBase(); + evb->runInEventBaseThread([announceHandle = announcement.second] { + announceHandle->unannounce(); + }); + } + nodePtr->announcements.clear(); // TODO: prune Announce tree } @@ -123,21 +131,20 @@ MoQRelay::subscribeAnnounces(SubscribeAnnounces subNs) { nodePtr->sessions.emplace(session); // Find all nested Announcements and forward - std::deque> nodes{ + std::deque>> nodes{ {subNs.trackNamespacePrefix, nodePtr}}; auto evb = session->getEventBase(); while (!nodes.empty()) { auto [prefix, nodePtr] = std::move(*nodes.begin()); nodes.pop_front(); - if (nodePtr->sourceSession) { - // We don't really care if we get announce error, I guess? + if (nodePtr->sourceSession && nodePtr->sourceSession != session) { // TODO: Auth/params - session->announce({prefix, {}}).scheduleOn(evb).start(); + announceToSession(session, {prefix, {}}, nodePtr).scheduleOn(evb).start(); } for (auto& nextNodeIt : nodePtr->children) { TrackNamespace nodePrefix(prefix); nodePrefix.append(nextNodeIt.first); - nodes.emplace_back(std::forward_as_tuple(nodePrefix, &nextNodeIt.second)); + nodes.emplace_back(std::forward_as_tuple(nodePrefix, nextNodeIt.second)); } } co_return std::make_shared( @@ -284,27 +291,30 @@ void MoQRelay::removeSession(const std::shared_ptr& session) { // Implicit UnsubscribeAnnounces nodePtr->sessions.erase(session); - // Add sessions for future Unannounce - notifySessions.insert( - notifySessions.end(), - nodePtr->sessions.begin(), - nodePtr->sessions.end()); + auto it = nodePtr->announcements.find(session); + if (it != nodePtr->announcements.end()) { + // we've announced this node to the removing session. + // Do we really need to unannounce? + it->second->unannounce(); + nodePtr->announcements.erase(it); + } if (nodePtr->sourceSession == session) { // This session is unannouncing nodePtr->sourceSession = nullptr; - for (auto& outSession : notifySessions) { - auto evb = outSession->getEventBase(); - Unannounce unann{prefix}; - evb->runInEventBaseThread( - [outSession, unann] { outSession->unannounce(unann); }); + for (auto& announcement : nodePtr->announcements) { + auto evb = announcement.first->getEventBase(); + evb->runInEventBaseThread([announceHandle = announcement.second] { + announceHandle->unannounce(); + }); } + nodePtr->announcements.clear(); } for (auto& nextNode : nodePtr->children) { TrackNamespace nodePrefix(prefix); nodePrefix.append(nextNode.first); nodes.emplace_back( - std::forward_as_tuple(std::move(nodePrefix), &nextNode.second)); + std::forward_as_tuple(std::move(nodePrefix), nextNode.second.get())); } } diff --git a/moxygen/relay/MoQRelay.h b/moxygen/relay/MoQRelay.h index 89010d48..9a823355 100644 --- a/moxygen/relay/MoQRelay.h +++ b/moxygen/relay/MoQRelay.h @@ -15,6 +15,7 @@ namespace moxygen { class MoQRelay : public Publisher, + public Subscriber, public std::enable_shared_from_this, public MoQForwarder::Callback { public: @@ -29,10 +30,9 @@ class MoQRelay : public Publisher, folly::coro::Task subscribeAnnounces( SubscribeAnnounces subAnn) override; - void onAnnounce(Announce&& ann, std::shared_ptr session); - void onUnannounce( - Unannounce&& ann, - const std::shared_ptr& session); + folly::coro::Task announce( + Announce ann, + std::shared_ptr) override; void removeSession(const std::shared_ptr& session); @@ -47,13 +47,28 @@ class MoQRelay : public Publisher, TrackNamespace prefix, std::shared_ptr session); - struct AnnounceNode { - folly::F14NodeMap children; + struct AnnounceNode : public Subscriber::AnnounceHandle { + explicit AnnounceNode(MoQRelay& relay) : relay_(relay) {} + + void unannounce() override { + relay_.unannounce(announceOk().trackNamespace, this); + } + + using Subscriber::AnnounceHandle::setAnnounceOk; + + folly::F14FastMap> children; + // Sessions with a SUBSCRIBE_ANNOUNCES here folly::F14FastSet> sessions; + // All active ANNOUNCEs for this node (includes prefix sessions) + folly:: + F14FastMap, std::shared_ptr> + announcements; + // The session that ANNOUNCEd this node std::shared_ptr sourceSession; + MoQRelay& relay_; }; - AnnounceNode announceRoot_; - AnnounceNode* findNamespaceNode( + AnnounceNode announceRoot_{*this}; + std::shared_ptr findNamespaceNode( const TrackNamespace& ns, bool createMissingNodes, std::vector>* sessions = nullptr); @@ -74,6 +89,13 @@ class MoQRelay : public Publisher, void onEmpty(MoQForwarder* forwarder) override; + folly::coro::Task announceToSession( + std::shared_ptr session, + Announce ann, + std::shared_ptr nodePtr); + + void unannounce(const TrackNamespace& trackNamespace, AnnounceNode* node); + TrackNamespace allowedNamespacePrefix_; folly::F14FastMap subscriptions_; diff --git a/moxygen/relay/MoQRelayClient.h b/moxygen/relay/MoQRelayClient.h index 832078ac..c45e0d06 100644 --- a/moxygen/relay/MoQRelayClient.h +++ b/moxygen/relay/MoQRelayClient.h @@ -13,12 +13,8 @@ namespace moxygen { class MoQRelayClient { public: - MoQRelayClient( - folly::EventBase* evb, - proxygen::URL url, - std::function( - std::shared_ptr)> controllerFn) - : moqClient_(evb, url), controllerFn_(controllerFn) {} + MoQRelayClient(folly::EventBase* evb, proxygen::URL url) + : moqClient_(evb, url) {} folly::coro::Task run( std::shared_ptr publisher, @@ -33,12 +29,6 @@ class MoQRelayClient { std::move(publisher), std::move(subscriber)); auto exec = co_await folly::coro::co_current_executor; - auto controller = controllerFn_(moqClient_.moqSession_); - if (!controller) { - XLOG(ERR) << "Failed to make controller"; - co_return; - } - controlReadLoop(std::move(controller)).scheduleOn(exec).start(); // could parallelize if (!moqClient_.moqSession_) { XLOG(ERR) << "Session is dead now #sad"; @@ -64,20 +54,7 @@ class MoQRelayClient { } private: - folly::coro::Task controlReadLoop( - std::unique_ptr controller) { - while (moqClient_.moqSession_) { - auto msg = co_await moqClient_.moqSession_->controlMessages().next(); - if (!msg) { - break; - } - boost::apply_visitor(*controller, msg.value()); - } - } MoQClient moqClient_; - std::function( - std::shared_ptr)> - controllerFn_; }; } // namespace moxygen diff --git a/moxygen/relay/MoQRelayServer.cpp b/moxygen/relay/MoQRelayServer.cpp index 9fb54bf9..53a3acc8 100644 --- a/moxygen/relay/MoQRelayServer.cpp +++ b/moxygen/relay/MoQRelayServer.cpp @@ -24,33 +24,9 @@ class MoQRelayServer : MoQServer { MoQRelayServer() : MoQServer(FLAGS_port, FLAGS_cert, FLAGS_key, FLAGS_endpoint) {} - class RelayControlVisitor : public MoQServer::ControlVisitor { - public: - RelayControlVisitor( - MoQRelayServer& server, - std::shared_ptr clientSession) - : MoQServer::ControlVisitor(std::move(clientSession)), - server_(server) {} - - void operator()(Announce announce) const override { - XLOG(INFO) << "Announce ns=" << announce.trackNamespace; - server_.relay_->onAnnounce(std::move(announce), clientSession_); - } - - void operator()(Unannounce unannounce) const override { - XLOG(INFO) << "Unannounce ns=" << unannounce.trackNamespace; - server_.relay_->onUnannounce(std::move(unannounce), clientSession_); - } - - private: - MoQRelayServer& server_; - }; - - std::unique_ptr makeControlVisitor( - std::shared_ptr clientSession) override { + void onNewSession(std::shared_ptr clientSession) override { clientSession->setPublishHandler(relay_); - return std::make_unique( - *this, std::move(clientSession)); + clientSession->setSubscribeHandler(relay_); } void terminateClientSession(std::shared_ptr session) override { diff --git a/moxygen/samples/chat/MoQChatClient.cpp b/moxygen/samples/chat/MoQChatClient.cpp index d31d3296..f3931e8d 100644 --- a/moxygen/samples/chat/MoQChatClient.cpp +++ b/moxygen/samples/chat/MoQChatClient.cpp @@ -43,13 +43,15 @@ folly::coro::Task MoQChatClient::run() noexcept { std::chrono::milliseconds(FLAGS_connect_timeout), std::chrono::seconds(FLAGS_transaction_timeout), /*publishHandler=*/shared_from_this(), - /*subscribeHandler=*/nullptr); - auto exec = co_await folly::coro::co_current_executor; - controlReadLoop().scheduleOn(exec).start(); - - // the announce and subscribe should be in parallel + /*subscribeHandler=*/shared_from_this()); + // the announce and subscribe announces should be in parallel auto announceRes = co_await moqClient_.moqSession_->announce( {participantTrackName(username_), {}}); + if (announceRes.hasError()) { + XLOG(ERR) << "Announce failed err=" << announceRes.error().reasonPhrase; + co_return; + } + announceHandle_ = std::move(announceRes.value()); // subscribe to the catalog track from the beginning of the latest group auto sa = co_await moqClient_.moqSession_->subscribeAnnounces( {TrackNamespace(chatPrefix()), @@ -71,40 +73,29 @@ folly::coro::Task MoQChatClient::run() noexcept { XLOG(INFO) << __func__ << " done"; } -folly::coro::Task MoQChatClient::controlReadLoop() { - class ControlVisitor : public MoQSession::ControlVisitor { - public: - explicit ControlVisitor(MoQChatClient& client) : client_(client) {} - - void operator()(Announce announce) const override { - XLOG(INFO) << "Announce ns=" << announce.trackNamespace; - if (announce.trackNamespace.startsWith( - TrackNamespace(client_.chatPrefix()))) { - if (announce.trackNamespace.size() != 5) { - client_.moqClient_.moqSession_->announceError( - {announce.trackNamespace, 400, "Invalid announce"}); - } - client_.moqClient_.moqSession_->announceOk({announce.trackNamespace}); - client_.subscribeToUser(std::move(announce.trackNamespace)) - .scheduleOn(client_.moqClient_.moqSession_->getEventBase()) - .start(); - } else { - client_.moqClient_.moqSession_->announceError( - {announce.trackNamespace, 404, "don't care"}); - } +folly::coro::Task MoQChatClient::announce( + Announce announce, + std::shared_ptr) { + XLOG(INFO) << "Announce ns=" << announce.trackNamespace; + if (announce.trackNamespace.startsWith(TrackNamespace(chatPrefix()))) { + if (announce.trackNamespace.size() != 5) { + co_return folly::makeUnexpected( + AnnounceError{announce.trackNamespace, 400, "Invalid chat announce"}); } - - private: - MoQChatClient& client_; - }; - XLOG(INFO) << __func__; - auto g = - folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - ControlVisitor visitor(*this); - MoQSession::ControlVisitor* vptr(&visitor); - while (auto msg = co_await moqClient_.moqSession_->controlMessages().next()) { - boost::apply_visitor(*vptr, msg.value()); + subscribeToUser(std::move(announce.trackNamespace)) + .scheduleOn(moqClient_.moqSession_->getEventBase()) + .start(); + } else { + co_return folly::makeUnexpected( + AnnounceError{announce.trackNamespace, 404, "don't care"}); } + co_return std::make_shared( + AnnounceOk{announce.trackNamespace}, shared_from_this()); +} + +void MoQChatClient::unannounce(const TrackNamespace&) { + // TODO: Upon receiving an UNANNOUNCE, a client SHOULD UNSUBSCRIBE from that + // matching track if it had previously subscribed. } folly::coro::Task MoQChatClient::subscribe( @@ -166,6 +157,7 @@ void MoQChatClient::publishLoop() { if (token.isCancellationRequested()) { XLOG(DBG1) << "Detected deleted moqSession, cleaning up"; evb->runInEventBaseThread([this] { + announceHandle_.reset(); subscribeAnnounceHandle_.reset(); publisher_.reset(); }); @@ -174,6 +166,7 @@ void MoQChatClient::publishLoop() { evb->runInEventBaseThread([this, input] { if (input == "/leave") { XLOG(INFO) << "Leaving chat"; + announceHandle_->unannounce(); subscribeAnnounceHandle_->unsubscribeAnnounces(); if (publisher_) { publisher_->objectStream( diff --git a/moxygen/samples/chat/MoQChatClient.h b/moxygen/samples/chat/MoQChatClient.h index fa112fc7..5b62d676 100644 --- a/moxygen/samples/chat/MoQChatClient.h +++ b/moxygen/samples/chat/MoQChatClient.h @@ -12,6 +12,7 @@ namespace moxygen { class MoQChatClient : public Publisher, public Publisher::SubscriptionHandle, + public Subscriber, public std::enable_shared_from_this { public: MoQChatClient( @@ -29,7 +30,27 @@ class MoQChatClient : public Publisher, std::shared_ptr consumer) override; void subscribeUpdate(SubscribeUpdate) override {} void unsubscribe() override; - folly::coro::Task controlReadLoop(); + + class AnnounceHandle : public Subscriber::AnnounceHandle { + public: + AnnounceHandle(AnnounceOk ok, std::shared_ptr client) + : Subscriber::AnnounceHandle(std::move(ok)), + client_(std::move(client)) {} + + void unannounce() override { + client_->unannounce(announceOk_->trackNamespace); + client_.reset(); + } + + private: + std::shared_ptr client_; + }; + + folly::coro::Task announce( + Announce announce, + std::shared_ptr) override; + void unannounce(const TrackNamespace&); + void publishLoop(); folly::coro::Task subscribeToUser(TrackNamespace trackNamespace); void subscribeDone(SubscribeDone subDone); @@ -69,6 +90,7 @@ class MoQChatClient : public Publisher, std::map> subscriptions_; std::pair, folly::coro::Future> peerSetup_{folly::coro::makePromiseContract()}; + std::shared_ptr announceHandle_; std::shared_ptr subscribeAnnounceHandle_; }; diff --git a/moxygen/samples/date/MoQDateServer.cpp b/moxygen/samples/date/MoQDateServer.cpp index 175f32b8..398a68fa 100644 --- a/moxygen/samples/date/MoQDateServer.cpp +++ b/moxygen/samples/date/MoQDateServer.cpp @@ -40,11 +40,7 @@ class MoQDateServer : public MoQServer, XLOG(ERR) << "Invalid url: " << FLAGS_relay_url; return false; } - relayClient_ = std::make_unique( - evb, url, [this](std::shared_ptr session) { - return std::make_unique( - *this, std::move(session)); - }); + relayClient_ = std::make_unique(evb, url); relayClient_ ->run( /*publisher=*/shared_from_this(), @@ -59,28 +55,13 @@ class MoQDateServer : public MoQServer, return true; } - class DateControlVisitor : public MoQServer::ControlVisitor { - public: - DateControlVisitor( - MoQDateServer& server, - std::shared_ptr clientSession) - : MoQServer::ControlVisitor(std::move(clientSession)), - server_(server) {} - - private: - MoQDateServer& server_; - }; - - std::unique_ptr makeControlVisitor( - std::shared_ptr clientSession) override { + void onNewSession(std::shared_ptr clientSession) override { clientSession->setPublishHandler(shared_from_this()); if (!loopRunning_) { // start date loop on first server connect loopRunning_ = true; publishDateLoop().scheduleOn(clientSession->getEventBase()).start(); } - return std::make_unique( - *this, std::move(clientSession)); } folly::coro::Task subscribe( diff --git a/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp b/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp index feb885c6..1f7dd6e0 100644 --- a/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp +++ b/moxygen/samples/flv_receiver_client/MoQFlvReceiverClient.cpp @@ -363,9 +363,6 @@ class MoQFlvReceiverClient std::chrono::seconds(FLAGS_transaction_timeout), /*publishHandler=*/nullptr, /*subscribeHandler=*/shared_from_this()); - auto exec = co_await folly::coro::co_current_executor; - controlReadLoop().scheduleOn(exec).start(); - // Create output file flvw_ = std::make_shared(flvOutPath_); trackReceiverHandlerAudio_.setFlvWriterShared(flvw_); @@ -418,11 +415,23 @@ class MoQFlvReceiverClient XLOG(ERR) << ex.what(); co_return; } + // TODO: should we co_await collectAll(trackReceiverHandlerAudio_.baton, + // trackReceiverHandlerVideo_.baton); co_await trackReceiverHandlerAudio_.baton; co_await trackReceiverHandlerVideo_.baton; XLOG(INFO) << __func__ << " done"; } + folly::coro::Task announce( + Announce announce, + std::shared_ptr) override { + XLOG(INFO) << "Announce ns=" << announce.trackNamespace; + // receiver client doesn't expect server or relay to announce anything, but + // announce OK anyways + return folly::coro::makeTask( + std::make_shared(AnnounceOk{announce.trackNamespace})); + } + void goaway(Goaway goaway) override { XLOG(INFO) << "Goaway uri=" << goaway.newSessionUri; stop(); @@ -440,32 +449,6 @@ class MoQFlvReceiverClient moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); } - folly::coro::Task controlReadLoop() { - class ControlVisitor : public MoQSession::ControlVisitor { - public: - explicit ControlVisitor(MoQFlvReceiverClient& client) : client_(client) {} - - void operator()(Announce announce) const override { - XLOG(WARN) << "Announce ns=" << announce.trackNamespace; - // text client doesn't expect server or relay to announce anything, - // but announce OK anyways - client_.moqClient_.moqSession_->announceOk({announce.trackNamespace}); - } - - private: - MoQFlvReceiverClient& client_; - }; - XLOG(INFO) << __func__; - auto g = - folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - ControlVisitor visitor(*this); - MoQSession::ControlVisitor* vptr(&visitor); - while (auto msg = - co_await moqClient_.moqSession_->controlMessages().next()) { - boost::apply_visitor(*vptr, msg.value()); - } - } - private: MoQClient moqClient_; std::shared_ptr audioSubscribeHandle_; diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index fe765021..c0c4552d 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -57,18 +57,14 @@ class MoQFlvStreamerClient std::chrono::seconds(FLAGS_transaction_timeout), /*publishHandler=*/shared_from_this(), /*subscribeHandler=*/nullptr); - auto exec = co_await folly::coro::co_current_executor; - controlReadLoop().scheduleOn(exec).start(); - // Announce auto annResp = co_await moqClient_.moqSession_->announce(std::move(ann)); if (annResp.hasValue()) { - trackNamespace_ = annResp->trackNamespace; - + announceHandle_ = std::move(annResp.value()); folly::getGlobalIOExecutor()->add([this] { publishLoop(); }); } else { XLOG(INFO) << "Announce error trackNamespace=" - << annResp->trackNamespace + << annResp.error().trackNamespace << " code=" << annResp.error().errorCode << " reason=" << annResp.error().reasonPhrase; } @@ -81,8 +77,10 @@ class MoQFlvStreamerClient void stop() { XLOG(INFO) << __func__; + if (announceHandle_) { + announceHandle_->unannounce(); + } if (moqClient_.moqSession_) { - moqClient_.moqSession_->unannounce({trackNamespace_}); moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); } } @@ -189,34 +187,6 @@ class MoQFlvStreamerClient co_return subscription; } - folly::coro::Task controlReadLoop() { - class ControlVisitor : public MoQSession::ControlVisitor { - public: - explicit ControlVisitor(MoQFlvStreamerClient& client) : client_(client) {} - - void operator()(Announce announce) const override { - XLOG(WARN) << "Announce ns=" << announce.trackNamespace; - // text client doesn't expect server or relay to announce anything, - // but announce OK anyways - client_.moqClient_.moqSession_->announceOk({announce.trackNamespace}); - } - - private: - MoQFlvStreamerClient& client_; - }; - XLOG(INFO) << __func__; - auto g = - folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - ControlVisitor visitor(*this); - MoQSession::ControlVisitor* vptr(&visitor); - while (auto msg = - co_await moqClient_.moqSession_->controlMessages().next()) { - boost::apply_visitor(*vptr, msg.value()); - } - - XLOG(INFO) << "Session closed"; - } - void publishAudio( TrackAlias trackAlias, std::unique_ptr item) { @@ -330,7 +300,7 @@ class MoQFlvStreamerClient } MoQClient moqClient_; - TrackNamespace trackNamespace_; + std::shared_ptr announceHandle_; FullTrackName fullVideoTrackName_; FullTrackName fullAudioTrackName_; diff --git a/moxygen/samples/text-client/MoQTextClient.cpp b/moxygen/samples/text-client/MoQTextClient.cpp index b55486d4..301b7065 100644 --- a/moxygen/samples/text-client/MoQTextClient.cpp +++ b/moxygen/samples/text-client/MoQTextClient.cpp @@ -113,9 +113,6 @@ class MoQTextClient : public Subscriber, std::chrono::seconds(FLAGS_transaction_timeout), /*publisher=*/nullptr, /*subscriber=*/shared_from_this()); - auto exec = co_await folly::coro::co_current_executor; - controlReadLoop().scheduleOn(exec).start(); - SubParams subParams{sub.locType, sub.start, sub.end}; sub.locType = LocationType::LatestObject; sub.start = folly::none; @@ -203,6 +200,16 @@ class MoQTextClient : public Subscriber, XLOG(INFO) << __func__ << " done"; } + folly::coro::Task announce( + Announce announce, + std::shared_ptr) override { + XLOG(INFO) << "Announce ns=" << announce.trackNamespace; + // text client doesn't expect server or relay to announce anything, + // but announce OK anyways + return folly::coro::makeTask( + std::make_shared(AnnounceOk{announce.trackNamespace})); + } + void goaway(Goaway goaway) override { XLOG(INFO) << "Goaway uri=" << goaway.newSessionUri; stop(); @@ -218,32 +225,6 @@ class MoQTextClient : public Subscriber, moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); } - folly::coro::Task controlReadLoop() { - class ControlVisitor : public MoQSession::ControlVisitor { - public: - explicit ControlVisitor(MoQTextClient& client) : client_(client) {} - - void operator()(Announce announce) const override { - XLOG(WARN) << "Announce ns=" << announce.trackNamespace; - // text client doesn't expect server or relay to announce anything, - // but announce OK anyways - client_.moqClient_.moqSession_->announceOk({announce.trackNamespace}); - } - - private: - MoQTextClient& client_; - }; - XLOG(INFO) << __func__; - auto g = - folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); - ControlVisitor visitor(*this); - MoQSession::ControlVisitor* vptr(&visitor); - while (auto msg = - co_await moqClient_.moqSession_->controlMessages().next()) { - boost::apply_visitor(*vptr, msg.value()); - } - } - MoQClient moqClient_; FullTrackName fullTrackName_; std::shared_ptr subscription_; diff --git a/moxygen/test/MoQSessionTest.cpp b/moxygen/test/MoQSessionTest.cpp index 24051ef4..a288c6af 100644 --- a/moxygen/test/MoQSessionTest.cpp +++ b/moxygen/test/MoQSessionTest.cpp @@ -21,38 +21,6 @@ using namespace moxygen; const size_t kTestMaxSubscribeId = 2; -class MockControlVisitorBase { - public: - virtual ~MockControlVisitorBase() = default; - virtual void onAnnounce(Announce announce) const = 0; - virtual void onUnannounce(Unannounce unannounce) const = 0; - virtual void onAnnounceCancel(AnnounceCancel announceCancel) const = 0; -}; - -class MockControlVisitor : public MoQSession::ControlVisitor, - MockControlVisitorBase { - public: - MockControlVisitor() = default; - ~MockControlVisitor() override = default; - - MOCK_METHOD(void, onAnnounce, (Announce), (const)); - void operator()(Announce announce) const override { - onAnnounce(announce); - } - - MOCK_METHOD(void, onUnannounce, (Unannounce), (const)); - void operator()(Unannounce unannounce) const override { - onUnannounce(unannounce); - } - - MOCK_METHOD(void, onAnnounceCancel, (AnnounceCancel), (const)); - void operator()(AnnounceCancel announceCancel) const override { - onAnnounceCancel(announceCancel); - } - - private: -}; - class MoQSessionTest : public testing::Test, public MoQSession::ServerSetupCallback { public: @@ -69,14 +37,6 @@ class MoQSessionTest : public testing::Test, void SetUp() override {} - folly::coro::Task controlLoop( - MoQSession& session, - MockControlVisitor& control) { - while (auto msg = co_await session.controlMessages().next()) { - boost::apply_visitor(control, msg.value()); - } - } - folly::Try onClientSetup(ClientSetup setup) override { EXPECT_EQ(setup.supportedVersions[0], kVersionDraftCurrent); if (!setup.params.empty()) { @@ -104,8 +64,6 @@ class MoQSessionTest : public testing::Test, std::unique_ptr serverWt_; std::shared_ptr clientSession_; std::shared_ptr serverSession_; - MockControlVisitor clientControl; - MockControlVisitor serverControl; std::shared_ptr clientPublisher{ std::make_shared()}; std::shared_ptr serverPublisher{ @@ -147,12 +105,6 @@ void MoQSessionTest::setupMoQSession() { }(clientSession_, initialMaxSubscribeId_) .scheduleOn(&eventBase_) .start(); - this->controlLoop(*serverSession_, serverControl) - .scheduleOn(&eventBase_) - .start(); - this->controlLoop(*clientSession_, clientControl) - .scheduleOn(&eventBase_) - .start(); eventBase_.loop(); } } // namespace