Skip to content

Commit

Permalink
refactor: introduce reliable transport layer (#50)
Browse files Browse the repository at this point in the history
This is the third commit in the series of incremental refactoring of the
current minivpn tree.

In this commit, we introduce the reliabletransport package, which is the
layer between packetmuxer and control channel.

Reliable transport adds a reliability layer on top of UDP (but it's also
used in TCP mode).

This first implementation of the reliability layer implements only a
naive strategy to get the TLS handshake working on an optimal
environment (i.e., no packet loss). We ACK any incoming packet, and we
assume all packets arrive in order.

After merging all the components in the new architecture, we will
revisit the reliability layer to follow the OpenVPN design more closely.

Reference issue: #47

---------

Co-authored-by: Simone Basso <[email protected]>
  • Loading branch information
ainghazal and bassosimone authored Jan 15, 2024
1 parent 4d0ca13 commit d525579
Showing 1 changed file with 187 additions and 0 deletions.
187 changes: 187 additions & 0 deletions internal/reliabletransport/reliabletransport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Package reliabletransport implements the reliable transport.
package reliabletransport

import (
"bytes"

"github.com/ooni/minivpn/internal/model"
"github.com/ooni/minivpn/internal/session"
"github.com/ooni/minivpn/internal/workers"
)

// Service is the reliable service. Make sure you initialize
// the channels before invoking [Service.StartWorkers].
type Service struct {
// DataOrControlToMuxer is a shared channel that moves packets down to the muxer
DataOrControlToMuxer *chan *model.Packet

// ControlToReliable moves packets down to us
ControlToReliable chan *model.Packet

// MuxerToReliable moves packets up to us
MuxerToReliable chan *model.Packet

// ReliableToControl moves packets up from us to the control layer above
ReliableToControl *chan *model.Packet
}

// StartWorkers starts the reliable-transport workers. See the [ARCHITECTURE]
// file for more information about the reliable-transport workers.
//
// [ARCHITECTURE]: https://github.com/ooni/minivpn/blob/main/ARCHITECTURE.md
func (s *Service) StartWorkers(
logger model.Logger,
workersManager *workers.Manager,
sessionManager *session.Manager,
) {
ws := &workersState{
logger: logger,
dataOrControlToMuxer: *s.DataOrControlToMuxer,
controlToReliable: s.ControlToReliable,
muxerToReliable: s.MuxerToReliable,
reliableToControl: *s.ReliableToControl,
sessionManager: sessionManager,
workersManager: workersManager,
}
workersManager.StartWorker(ws.moveUpWorker)
workersManager.StartWorker(ws.moveDownWorker)
}

// workersState contains the reliable workers state
type workersState struct {
// logger is the logger to use
logger model.Logger

// dataOrControlToMuxer is the channel where we write packets going down the stack.
dataOrControlToMuxer chan<- *model.Packet

// controlToReliable is the channel from which we read packets going down the stack.
controlToReliable <-chan *model.Packet

// muxerToReliable is the channel from which we read packets going up the stack.
muxerToReliable <-chan *model.Packet

// reliableToControl is the channel where we write packets going up the stack.
reliableToControl chan<- *model.Packet

// sessionManager manages the OpenVPN session.
sessionManager *session.Manager

// workersManager controls the workers lifecycle.
workersManager *workers.Manager
}

// moveUpWorker moves packets up the stack
func (ws *workersState) moveUpWorker() {
defer func() {
ws.workersManager.OnWorkerDone()
ws.workersManager.StartShutdown()
ws.logger.Debug("reliable: moveUpWorker: done")
}()

ws.logger.Debug("reliable: moveUpWorker: started")

// TODO: do we need to have notifications from the control channel
// to reset state or can we do this implicitly?

for {
// POSSIBLY BLOCK reading a packet to move up the stack
// or POSSIBLY BLOCK waiting for notifications
select {
case packet := <-ws.muxerToReliable:
ws.logger.Infof(
"< %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

// drop a packet that is not for our session
if !bytes.Equal(packet.LocalSessionID[:], ws.sessionManager.RemoteSessionID()) {
ws.logger.Warnf(
"reliable: moveUpWorker: packet with invalid RemoteSessionID: expected %x; got %x",
ws.sessionManager.LocalSessionID(),
packet.RemoteSessionID,
)
continue
}

// possibly ACK the incoming packet
if err := ws.maybeACK(packet); err != nil {
ws.logger.Warnf("reliable: moveUpWorker: cannot ACK packet: %s", err.Error())
continue
}

// TODO: here we should track ACKs

// POSSIBLY BLOCK delivering to the upper layer
select {
case ws.reliableToControl <- packet:
case <-ws.workersManager.ShouldShutdown():
return
}

case <-ws.workersManager.ShouldShutdown():
return
}
}
}

// moveDownWorker moves packets down the stack
func (ws *workersState) moveDownWorker() {
defer func() {
ws.workersManager.OnWorkerDone()
ws.workersManager.StartShutdown()
ws.logger.Debug("reliable: moveDownWorker: done")
}()

ws.logger.Debug("reliable: moveDownWorker: started")

// TODO: we should have timer for retransmission
for {
// POSSIBLY BLOCK reading the next packet we should move down the stack
select {
case packet := <-ws.controlToReliable:
// TODO: here we should treat control packets specially

ws.logger.Infof(
"> %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

// POSSIBLY BLOCK delivering this packet to the lower layer
select {
case ws.dataOrControlToMuxer <- packet:
case <-ws.workersManager.ShouldShutdown():
return
}

case <-ws.workersManager.ShouldShutdown():
return
}
}
}

// maybeACK sends an ACK when needed.
func (ws *workersState) maybeACK(packet *model.Packet) error {
// currently we are ACKing every packet
// TODO: implement better ACKing strategy

// this function will fail if we don't know the remote session ID
ACK, err := ws.sessionManager.NewACKForPacket(packet)
if err != nil {
return err
}

// move the packet down
select {
case ws.dataOrControlToMuxer <- ACK:
return nil
case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}
}

0 comments on commit d525579

Please sign in to comment.