Skip to content

Commit

Permalink
refactor: introduce the packetmuxer layer (#49)
Browse files Browse the repository at this point in the history
This is the second commit in the series of incremental refactoring of
the current minivpn tree.

In this commit, we introduce the packet muxer, which is the layer just
above the networkio. The packet muxer handles and routes data or control
packets, and it also handles reset packets.

As dependencies for `packetmuxer`, we're also introducing the `optional`
and `session` packages.

Reference issue: #47

---------

Co-authored-by: Simone Basso <[email protected]>
  • Loading branch information
ainghazal and bassosimone authored Jan 15, 2024
1 parent d976c5a commit 4d0ca13
Show file tree
Hide file tree
Showing 6 changed files with 822 additions and 0 deletions.
58 changes: 58 additions & 0 deletions internal/optional/optional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package optional

import (
"reflect"

"github.com/ooni/minivpn/internal/runtimex"
)

// Value is an optional value. The zero value of this structure
// is equivalent to the one you get when calling [None].
type Value[T any] struct {
// indirect is the indirect pointer to the value.
indirect *T
}

// None constructs an empty value.
func None[T any]() Value[T] {
return Value[T]{nil}
}

// Some constructs a some value unless T is a pointer and points to
// nil, in which case [Some] is equivalent to [None].
func Some[T any](value T) Value[T] {
v := Value[T]{}
maybeSetFromValue(&v, value)
return v
}

// maybeSetFromValue sets the underlying value unless T is a pointer
// and points to nil in which case we set the Value to be empty.
func maybeSetFromValue[T any](v *Value[T], value T) {
rv := reflect.ValueOf(value)
if rv.Type().Kind() == reflect.Pointer && rv.IsNil() {
v.indirect = nil
return
}
v.indirect = &value
}

// IsNone returns whether this [Value] is empty.
func (v Value[T]) IsNone() bool {
return v.indirect == nil
}

// Unwrap returns the underlying value or panics. In case of
// panic, the value passed to panic is an error.
func (v Value[T]) Unwrap() T {
runtimex.Assert(!v.IsNone(), "is none")
return *v.indirect
}

// UnwrapOr returns the fallback if the [Value] is empty.
func (v Value[T]) UnwrapOr(fallback T) T {
if v.IsNone() {
return fallback
}
return v.Unwrap()
}
288 changes: 288 additions & 0 deletions internal/packetmuxer/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
// Package packetmuxer implements the packet-muxer workers.
package packetmuxer

import (
"github.com/ooni/minivpn/internal/model"
"github.com/ooni/minivpn/internal/session"
"github.com/ooni/minivpn/internal/workers"
)

// Service is the packetmuxer service. Make sure you initialize
// the channels before invoking [Service.StartWorkers].
type Service struct {
// HardReset receives requests to initiate a hard reset, that will start the openvpn handshake.
HardReset chan any

// NotifyTLS sends reset notifications to tlsstate.
NotifyTLS *chan *model.Notification

// MuxerToReliable moves packets up to reliabletransport.
MuxerToReliable *chan *model.Packet

// MuxerToData moves packets up to the datachannel.
MuxerToData *chan *model.Packet

// DataOrControlToMuxer moves packets down from the reliabletransport or datachannel.
DataOrControlToMuxer chan *model.Packet

// MuxerToNetwork moves bytes down to the networkio layer below us.
MuxerToNetwork *chan []byte

// NetworkToMuxer moves bytes up to us from the networkio layer below.
NetworkToMuxer chan []byte
}

// StartWorkers starts the packet-muxer workers. See the [ARCHITECTURE]
// file for more information about the packet-muxer workers.
//
// [ARCHITECTURE]: https://github.com/ooni/minivpn/blob/main/ARCHITECTURE.md
func (s *Service) StartWorkers(
logger model.Logger,
workersManager *workers.Manager,
sessionManager *session.Manager,
) {
ws := &workersState{
logger: logger,
hardReset: s.HardReset,
notifyTLS: *s.NotifyTLS,
muxerToReliable: *s.MuxerToReliable,
muxerToData: *s.MuxerToData,
dataOrControlToMuxer: s.DataOrControlToMuxer,
muxerToNetwork: *s.MuxerToNetwork,
networkToMuxer: s.NetworkToMuxer,
sessionManager: sessionManager,
workersManager: workersManager,
}
workersManager.StartWorker(ws.moveUpWorker)
workersManager.StartWorker(ws.moveDownWorker)
}

// workersState contains the reliabletransport workers state.
type workersState struct {
// logger is the logger to use
logger model.Logger

// hardReset is the channel posted to force a hard reset.
hardReset <-chan any

// notifyTLS is used to send notifications to the TLS service.
notifyTLS chan<- *model.Notification

// dataOrControlToMuxer is the channel for reading all the packets traveling down the stack.
dataOrControlToMuxer <-chan *model.Packet

// muxerToReliable is the channel for writing control packets going up the stack.
muxerToReliable chan<- *model.Packet

// muxerToData is the channel for writing data packets going up the stack.
muxerToData chan<- *model.Packet

// muxerToNetwork is the channel for writing raw packets going down the stack.
muxerToNetwork chan<- []byte

// networkToMuxer is the channel for reading raw packets going up the stack.
networkToMuxer <-chan []byte

// sessionManager manages the OpenVPN session.
sessionManager *session.Manager

// workersManager controls the workers lifecycle.
workersManager *workers.Manager
}

// moveUpWorker moves packets up the stack
func (ws *workersState) moveUpWorker() {
defer func() {
ws.workersManager.OnWorkerDone()
ws.workersManager.StartShutdown()
ws.logger.Debug("packetmuxer: moveUpWorker: done")
}()

ws.logger.Debug("packetmuxer: moveUpWorker: started")

for {
// POSSIBLY BLOCK awaiting for incoming raw packet
select {
case rawPacket := <-ws.networkToMuxer:
if err := ws.handleRawPacket(rawPacket); err != nil {
// error already printed
return
}

case <-ws.hardReset:
if err := ws.startHardReset(); err != nil {
// error already logged
return
}

case <-ws.workersManager.ShouldShutdown():
return
}
}
}

// moveDownWorker moves packets down the stack
func (ws *workersState) moveDownWorker() {
defer func() {
ws.workersManager.OnWorkerDone()
ws.workersManager.StartShutdown()
ws.logger.Debug("packetmuxer: moveDownWorker: done")
}()

ws.logger.Debug("packetmuxer: moveDownWorker: started")

for {
// POSSIBLY BLOCK on reading the packet moving down the stack
select {
case packet := <-ws.dataOrControlToMuxer:
// serialize the packet
rawPacket, err := packet.Bytes()
if err != nil {
ws.logger.Warnf("packetmuxer: cannot serialize packet: %s", err.Error())
continue
}

// While this channel send could possibly block, the [ARCHITECTURE] is
// such that (1) the channel is buffered and (2) the channel sender should
// avoid blocking when inserting data into the channel.
//
// [ARCHITECTURE]: https://github.com/ooni/minivpn/blob/main/ARCHITECTURE.md
select {
case ws.muxerToNetwork <- rawPacket:
default:
// drop the packet if the buffer is full as documented above
case <-ws.workersManager.ShouldShutdown():
return
}

case <-ws.workersManager.ShouldShutdown():
return
}
}
}

// startHardReset is invoked when we need to perform a HARD RESET.
func (ws *workersState) startHardReset() error {
// emit a CONTROL_HARD_RESET_CLIENT_V2 pkt
packet, err := ws.sessionManager.NewPacket(model.P_CONTROL_HARD_RESET_CLIENT_V2, nil)
if err != nil {
ws.logger.Warnf("packetmuxer: NewPacket: %s", err.Error())
return err
}
if err := ws.serializeAndEmit(packet); err != nil {
return err
}

// reset the state to become initial again
ws.sessionManager.SetNegotiationState(session.S_PRE_START)

// TODO: any other change to apply in this case?

return nil
}

// handleRawPacket is the code invoked to handle a raw packet.
func (ws *workersState) handleRawPacket(rawPacket []byte) error {
// make sense of the packet
packet, err := model.ParsePacket(rawPacket)
if err != nil {
ws.logger.Warnf("packetmuxer: moveUpWorker: ParsePacket: %s", err.Error())
return nil // keep running
}

// handle the case where we're performing a HARD_RESET
if ws.sessionManager.NegotiationState() == session.S_PRE_START &&
packet.Opcode == model.P_CONTROL_HARD_RESET_SERVER_V2 {
return ws.finishThreeWayHandshake(packet)
}

// TODO: introduce other sanity checks here

// multiplex the incoming packet POSSIBLY BLOCKING on delivering it
if packet.IsControl() || packet.Opcode == model.P_ACK_V1 {
select {
case ws.muxerToReliable <- packet:
case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}
} else {
select {
case ws.muxerToData <- packet:
case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}
}

return nil
}

// finishThreeWayHandshake responsds to the HARD_RESET_SERVER and finishes the handshake.
func (ws *workersState) finishThreeWayHandshake(packet *model.Packet) error {
// register the server's session (note: the PoV is the server's one)
ws.sessionManager.SetRemoteSessionID(packet.LocalSessionID)

// we need to manually ACK because the reliable layer is above us
ws.logger.Debugf(
"< %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

// create the ACK packet
ACK, err := ws.sessionManager.NewACKForPacket(packet)
if err != nil {
return err
}

// emit the packet
if err := ws.serializeAndEmit(ACK); err != nil {
return err
}

// advance the state
ws.sessionManager.SetNegotiationState(session.S_START)

// attempt to tell TLS we want to handshake
select {
case ws.notifyTLS <- &model.Notification{Flags: model.NotificationReset}:
// nothing

default:
// the architecture says this notification should be nonblocking

case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}

return nil
}

// serializeAndEmit was written because Ain Ghazal was very insistent about it.
func (ws *workersState) serializeAndEmit(packet *model.Packet) error {
// serialize it
rawPacket, err := packet.Bytes()
if err != nil {
return err
}

// emit the packet
select {
case ws.muxerToNetwork <- rawPacket:
// nothing

case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}

ws.logger.Debugf(
"> %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

return nil
}
Loading

0 comments on commit 4d0ca13

Please sign in to comment.