diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..bae0482a --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,9 @@ +# Checklist + +* [ ] I have read the contribution guidelines +* [ ] Iff you changed code related to services, or inter-service communication, make sure you update the diagrams in `ARCHITECTURE.md`. +* [ ] Reference issue for this pull request: + +# Description + +Please, insert here a more detailed description. diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 00000000..b62ff4c8 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,216 @@ +# minivpn design + +`minivpn` (after the re-design landed in January 2024) follows a layered design that tries to capture closely the OpenVPN protocol. + +* The bottom layer is [networkio](https://github.com/ooni/minivpn/tree/main/internal/networkio), which deals with network reads and writes. The module implements a [FramingConn](https://github.com/ainghazal/minivpn/blob/main/internal/networkio/framing.go#L10). +* The [packetmuxer](https://github.com/ainghazal/minivpn/blob/main/internal/packetmuxer/service.go) routes both data and control packets under the underlying connection. Multiplexing is needed so that the TLS session sees a [reliable transport](https://community.openvpn.net/openvpn/wiki/SecurityOverview). +* [reliabletransport](https://github.com/ainghazal/minivpn/blob/main/internal/reliabletransport/reliabletransport.go) implements reordering and acknowledgement for incoming packages, and retransmission for outgoing packets. +* [controlchannel](https://github.com/ainghazal/minivpn/blob/main/internal/controlchannel/controlchannel.go) serializes and deserializes data according to the control channel format; and it reacts to `SOFT_RESET_V1` packets. +* [tlsession](https://github.com/ainghazal/minivpn/blob/main/internal/tlssession/tlssession.go) performs a TLS handshake and negotiates a key exchange over the established session. It moves tls records up and down from/towards the `controlchannel`. +* The [datachannel](https://github.com/ainghazal/minivpn/tree/main/internal/datachannel) performs encryption and decryption for IP Tunnel Packets. +* [TUN](https://github.com/ainghazal/minivpn/blob/main/internal/tun/tun.go) is the user-facing interface. It can read and write `[]byte`. +* Finally, the [workers.Manager](https://github.com/ainghazal/minivpn/blob/main/internal/workers/workers.go) component deals with coordination among all the components. + + +## Services + +* Each layer is implemented as a service, that can be found under its own package under the [internal](https://github.com/ainghazal/minivpn/blob/main/internal) path. +* Each service initializes and starts a number of workers (typicall two: one for moving data up the stack, and another one for moving data down). Some services implement only one worker, some do three. +* The communication among the different components happens via channels. +* Some channels are used for event notification, some channels move sequences of `[]byte` or `*model.Packet`. +* The channels leaving and arriving each module can be seen in the diagram below: + + +``` + startShtdwn + ┌───────────────────────────────────────────┬────────────►┌──────────────┐ + │ │ shtdwn! │ │ + │ TUN │◄────────────┤ workers │ + │ │ Ready │ Manager │ + └────▲───────┬──────────────────────────────┘◄────────────┤ │ + │ │ │ │ + │tunUp │tunDown │ │ + ┌────┴───────▼──────────────────────────────┐ │ │ + │ │ shtdwn! │ │ + │ datachannel │◄────────────┤ │ + │ │ │ │ + └───▲────────┬────────────────────────▲─────┘ │ │ + │ │ keyUp │ shtdwn! │ │ + │ │ ┌────────────────┴─────┐◄────────────┤ │ + │ │ │ │ │ │ + │ │ │ tlssession ◄──┐ │ │ + │ │ └───────▲──────────▲───┘ │ │ │ + │ │ tlsRec │ │ notifyTLS │ │ + muxerTo │ Down│ tlsRecUp notifyTLS │ │ │ + Data │ │ │ │ │ │ │ + │ │ ┌─▼─────┴──────────┴───┐ │ │ │ + │ │ │ │ │ │ │ + │ │ │ controlchannel │ │ │ │ + │ │ └─┬─────▲──────────────┘ │ ◄────────┤ │ + │ │ ctrl │ │ │ shtdwn!│ │ + │ │ 2Rel │ rel2Ctrl │ │ │ + │ │ ┌─▼────────────────────┐ │ │ │ + │ │ │ │ │ ◄────────┤ │ + │ │ │ reliabletransport │ │ shtdwn!│ │ + │ │ └───────▲──────────────┘ │ │ │ + │ dataOrCtrlToMuxer │ muxerToReliable │ │ │ + │ │ │ │ │ │ │ + ┌───┴────────▼─────────▼─────┴──────────────┐ │ │ │ +hardReset│ │ │ │ │ + ────► packetkmuxer & HRESET ├──┘ │ │ + │ │ │ │ + └───────────────────┬────────▲──────────────┘◄────────────┤ │ + muxerToNetwork│ │networkToMuxer shtdwn! │ │ + ┌───────────────────▼────────┴──────────────┐ │ │ + │ │ │ │ + │ network I/O │◄────────────┤ │ + │ │ shtdwn! │ │ + └───────────────────────────────────────────┘ └──────────────┘ +``` + +# Implementation and liveness analysis + +In the layered architecture detailed above, there are 12 different goroutines +tasked with moving data across the stack, in 6 services: + +1. **networkio**: 2 workers (up/down). +2. **packetmuxer**: 2 workers (up/down). +3. **reliabletransport**: 2 workers (up/down). +4. **controlchannel**: 2 workers (up/down). +5. **tlssession**: 1 worker +6. **datachannel**: 3 workers (up/down/key). + +The `TUN` abstraction reads and writes to the `tunUp` and `tunDown` channels; TUN user is responsible for dialing the connection and passing a `networkio.FramingConn` to the `tun.StartTUN()` constructor. The TUN constructor will own the conn, and will also start an internal session.Manager and workers.Manager to deal with service coordination. + +By design, it is the responsibility of the upper, public layer (TUN) to account for timeouts (like the TLS handshake timeout), and to close the underlying connection and signal the teardown of all the workers. + +The channel communication between services is designed to be blocking, with unbuffered channels. + +To reason about **liveness** on the system, we make the following... + +## Assumptions + +* We'll call "chain" to a series of workers, connected via shared channels, between a source and a sink. +* Any `Read()` or `Write()` from/to the underlying `conn` can block, and since all the workers in a single chain are connected via blocking reads/writes, this block can freely propagate across the stack. The chain moving up pauses while blocked on receiving from the network, and the chain going down pauses if the system is blocked on delivering to the network. +* We assume this is fine because (a) what happens after the syscall boundary is beyond our control, and (b) we want to make these blocks visible to the system (i.e., we want to avoid hiding these events by using queues). +* It is the responsibility of `minivpn`'s user to keep reading from the `TUN` interface so that incoming data packets can be processed. +* Any channels that connect the up and down processing chains (like, for instance, the internal channel that connects `packetmuxer.moveUpWorker` with `packetmuxer.moveDownWorker` to process ACKs) needs to be made buffered and with non-blocking writes. +* The goroutine responsible for the `TLS` service handshake (meaning, the TLS handshake + the control message push and reply to exchange keys) is sequential, and therefore no reads and writes can happen concurrently. +* Guarding `tlsNotify` notifications to the TLS layer need special care (to avoid concurrent notifications while processing the handshake). + + + +```mermaid +stateDiagram-v2 + # you can edit this diagram in https://mermaid.live + + classDef tunrw font-style:italic,font-weight:bold,fill:yellow + + # workers + + state "TUN.Write()" as tundown + state "TUN.Read()" as tunup + + state "datach.moveDownWorker" as datadown + state "datach.moveUpWorker" as dataup + state "datach.keyWorker" as datakey + + state "muxer.moveDownWorker" as muxerdown + + state "muxer.moveUpWorker" as muxerup + + state "reliable.moveDownWorker" as reliabledown + state "reliable.moveUpWorker" as reliableup + + state "ctrlch.moveDownWorker" as controldown + state "ctrlch.moveUpWorker" as controlup + state "tlssession.worker" as tls + + state "networkio.moveDownWorker" as networkdown + state "networkio.moveUpWorker" as networkup + + # channels + + state tunDownCh <> + state tunUpCh <> + state muxerToNetwork <> + state networkToMuxer <> + + + state dataOrCtrlToMuxer <> + state tlsRecordUp <> + state tlsRecordDown <> + state newkey <> + + state reliableToControl <> + state packetInfo <> + state notifytls <> + + state internetout <> + state internetin <> + + # + # we begin our journey with writes to the TUN interface + # (going down) + # + + [*] --> tundown + tundown:::tunrw --> tunDownCh : []byte + tunDownCh --> datadown + + datadown --> dataOrCtrlToMuxer : *Packet + reliabledown --> dataOrCtrlToMuxer : *Packet + + dataOrCtrlToMuxer --> muxerdown: pkt <- dataOrCtrlToMuxer + muxerdown --> muxerToNetwork : []byte + muxerToNetwork --> networkdown + + controldown --> reliabledown + + networkdown --> internetout: conn.Write() + internetin --> networkup: conn.Read() + + tls --> tlsRecordDown: tlsDown + tlsRecordDown --> controldown: tlsDown + tls --> newkey: key + newkey --> datakey: key + + muxerup --> muxer.checkPacket() + + # an extra boundary to force a nicer layout + state muxer.checkPacket() { + direction LR + state if_data <> + if_data --> reliableup: isControl? + if_data --> dataup: isData? + + } + + # second part, we read from the network + # (and go up) + + networkup --> networkToMuxer : []byte + networkToMuxer --> muxerup + + muxerup --> notifytls: notifyTLS + controlup --> notifytls: notifyTLS + notifytls --> tls: notifyTLS + + reliableup --> reliableToControl : *Packet + reliableToControl --> controlup + + + reliableup --> packetInfo: packetInfo + packetInfo --> reliabledown: packetInfo + + controlup --> tlsRecordUp: tlsUp + tlsRecordUp --> tls: tlsUp + + + # data to tun + dataup --> tunUpCh + tunUpCh --> tunup + tunup:::tunrw --> [*] + + +``` diff --git a/internal/controlchannel/controlchannel.go b/internal/controlchannel/controlchannel.go index 1a88071c..22c1ab44 100644 --- a/internal/controlchannel/controlchannel.go +++ b/internal/controlchannel/controlchannel.go @@ -1,11 +1,17 @@ package controlchannel import ( + "fmt" + "github.com/ooni/minivpn/internal/model" "github.com/ooni/minivpn/internal/session" "github.com/ooni/minivpn/internal/workers" ) +var ( + serviceName = "controlchannel" +) + // Service is the controlchannel service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -61,13 +67,14 @@ type workersState struct { } func (ws *workersState) moveUpWorker() { + workerName := fmt.Sprintf("%s: moveUpWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("controlchannel: moveUpWorker: done") }() - ws.logger.Debug("controlchannel: moveUpWorker: started") + ws.logger.Debugf("%s: started", workerName) for { // POSSIBLY BLOCK on reading the packet moving up the stack @@ -79,18 +86,20 @@ func (ws *workersState) moveUpWorker() { case model.P_CONTROL_SOFT_RESET_V1: // We cannot blindly accept SOFT_RESET requests. They only make sense // when we have generated keys. Note that a SOFT_RESET returns us to - // the INITIAL state, therefore, we cannot have concurrent resets in place. - - // TODO(ainghazal): revisit this assumption - // when we implement key rotation. OpenVPN has - // the concept of a "lame duck", i.e., the - // retiring key that needs to be expired a fixed time after the new - // one starts its lifetime. + // the INITIAL state, therefore, we will not have concurrent resets in place, + // even if after the first key generation we receive two SOFT_RESET requests + // back to back. if ws.sessionManager.NegotiationState() < session.S_GENERATED_KEYS { continue } ws.sessionManager.SetNegotiationState(session.S_INITIAL) + // TODO(ainghazal): revisit this step. + // when we implement key rotation. OpenVPN has + // the concept of a "lame duck", i.e., the + // retiring key that needs to be expired a fixed time after the new + // one starts its lifetime, and this might be a good place to try + // to retire the old key. // notify the TLS layer that it should initiate // a TLS handshake and, if successful, generate @@ -121,13 +130,14 @@ func (ws *workersState) moveUpWorker() { } func (ws *workersState) moveDownWorker() { + workerName := fmt.Sprintf("%s: moveDownWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("controlchannel: moveUpWorker: done") }() - ws.logger.Debug("controlchannel: moveUpWorker: started") + ws.logger.Debugf("%s: started", workerName) for { // POSSIBLY BLOCK on reading the TLS record moving down the stack @@ -136,7 +146,7 @@ func (ws *workersState) moveDownWorker() { // transform the record into a control message packet, err := ws.sessionManager.NewPacket(model.P_CONTROL_V1, record) if err != nil { - ws.logger.Warnf("controlchannel: NewPacket: %s", err.Error()) + ws.logger.Warnf("%s: NewPacket: %s", workerName, err.Error()) return } diff --git a/internal/datachannel/service.go b/internal/datachannel/service.go index 7e263291..00837a9b 100644 --- a/internal/datachannel/service.go +++ b/internal/datachannel/service.go @@ -14,6 +14,10 @@ import ( "github.com/ooni/minivpn/internal/workers" ) +var ( + serviceName = "datachannel" +) + // Service is the datachannel service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -41,9 +45,9 @@ type Service struct { // eventually BLOCKS on tunUp to deliver it; // // 2. moveDownWorker BLOCKS on tunDown to read a packet and -// eventually BLOCKS on packetDown to deliver it; +// eventually BLOCKS on dataOrControlToMuxer to deliver it; // -// 3. keyWorker BLOCKS on keyUp to read an dataChannelKey and +// 3. keyWorker BLOCKS on keyUp to read a dataChannelKey and // initializes the internal state with the resulting key; func (s *Service) StartWorkers( logger model.Logger, @@ -57,15 +61,15 @@ func (s *Service) StartWorkers( return } ws := &workersState{ - logger: logger, - muxerToData: s.MuxerToData, + dataChannel: dc, dataOrControlToMuxer: *s.DataOrControlToMuxer, - tunToData: s.TUNToData, dataToTUN: s.DataToTUN, keyReady: s.KeyReady, - dataChannel: dc, - workersManager: workersManager, + logger: logger, + muxerToData: s.MuxerToData, sessionManager: sessionManager, + tunToData: s.TUNToData, + workersManager: workersManager, } firstKeyReady := make(chan any) @@ -77,24 +81,27 @@ func (s *Service) StartWorkers( // workersState contains the data channel state. type workersState struct { - logger model.Logger - workersManager *workers.Manager - sessionManager *session.Manager - keyReady <-chan *session.DataChannelKey - muxerToData <-chan *model.Packet + dataChannel *DataChannel dataOrControlToMuxer chan<- *model.Packet dataToTUN chan<- []byte + keyReady <-chan *session.DataChannelKey + logger model.Logger + muxerToData <-chan *model.Packet + sessionManager *session.Manager tunToData <-chan []byte - dataChannel *DataChannel + workersManager *workers.Manager } // moveDownWorker moves packets down the stack. It will BLOCK on PacketDown func (ws *workersState) moveDownWorker(firstKeyReady <-chan any) { + workerName := serviceName + ":moveDownWorker" defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("datachannel: moveDownWorker: done") }() + + ws.logger.Debugf("%s: started", workerName) + select { // wait for the first key to be ready case <-firstKeyReady: @@ -111,8 +118,6 @@ func (ws *workersState) moveDownWorker(firstKeyReady <-chan any) { select { case ws.dataOrControlToMuxer <- packet: - default: - // drop the packet if the buffer is full case <-ws.workersManager.ShouldShutdown(): return } @@ -128,11 +133,15 @@ func (ws *workersState) moveDownWorker(firstKeyReady <-chan any) { // moveUpWorker moves packets up the stack func (ws *workersState) moveUpWorker() { + workerName := fmt.Sprintf("%s: moveUpWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("datachannel: moveUpWorker: done") }() + ws.logger.Debugf("%s: started", workerName) + for { select { // TODO: opportunistically try to kill lame duck @@ -152,6 +161,7 @@ func (ws *workersState) moveUpWorker() { continue } + // POSSIBLY BLOCK writing up towards TUN ws.dataToTUN <- decrypted case <-ws.workersManager.ShouldShutdown(): return @@ -161,13 +171,14 @@ func (ws *workersState) moveUpWorker() { // keyWorker receives notifications from key ready func (ws *workersState) keyWorker(firstKeyReady chan<- any) { + workerName := fmt.Sprintf("%s: keyWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("datachannel: worker: done") }() - ws.logger.Debug("datachannel: worker: started") + ws.logger.Debugf("%s: started", workerName) once := &sync.Once{} for { diff --git a/internal/networkio/service.go b/internal/networkio/service.go index 77e25675..f10d5bf4 100644 --- a/internal/networkio/service.go +++ b/internal/networkio/service.go @@ -1,10 +1,16 @@ package networkio import ( + "fmt" + "github.com/ooni/minivpn/internal/model" "github.com/ooni/minivpn/internal/workers" ) +var ( + serviceName = "networkio" +) + // Service is the network I/O service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -58,15 +64,14 @@ type workersState struct { // moveUpWorker moves packets up the stack. func (ws *workersState) moveUpWorker() { + workerName := fmt.Sprintf("%s: moveUpWorker", serviceName) + defer func() { // make sure the manager knows we're done - ws.manager.OnWorkerDone() + ws.manager.OnWorkerDone(workerName) // tear down everything else because a workers exited ws.manager.StartShutdown() - - // emit useful debug message - ws.logger.Debug("networkio: moveUpWorker: done") }() ws.logger.Debug("networkio: moveUpWorker: started") @@ -75,7 +80,7 @@ func (ws *workersState) moveUpWorker() { // POSSIBLY BLOCK on the connection to read a new packet pkt, err := ws.conn.ReadRawPacket() if err != nil { - ws.logger.Debugf("networkio: moveUpWorker: ReadRawPacket: %s", err.Error()) + ws.logger.Debugf("%s: ReadRawPacket: %s", workerName, err.Error()) return } @@ -90,13 +95,17 @@ func (ws *workersState) moveUpWorker() { // moveDownWorker moves packets down the stack func (ws *workersState) moveDownWorker() { + workerName := fmt.Sprintf("%s: moveDownWorker", serviceName) + defer func() { + // make sure the manager knows we're done + ws.manager.OnWorkerDone(workerName) + + // tear down everything else because a worker exited ws.manager.StartShutdown() - ws.manager.OnWorkerDone() - ws.logger.Debug("networkio: moveDownWorker: done") }() - ws.logger.Debug("networkio: moveDownWorker: started") + ws.logger.Debugf("%s: started", workerName) for { // POSSIBLY BLOCK when receiving from channel. @@ -104,7 +113,7 @@ func (ws *workersState) moveDownWorker() { case pkt := <-ws.muxerToNetwork: // POSSIBLY BLOCK on the connection to write the packet if err := ws.conn.WriteRawPacket(pkt); err != nil { - ws.logger.Infof("networkio: moveDownWorker: WriteRawPacket: %s", err.Error()) + ws.logger.Infof("%s: WriteRawPacket: %s", workerName, err.Error()) return } diff --git a/internal/packetmuxer/service.go b/internal/packetmuxer/service.go index d64dffc4..88f3bfd9 100644 --- a/internal/packetmuxer/service.go +++ b/internal/packetmuxer/service.go @@ -2,11 +2,17 @@ package packetmuxer import ( + "fmt" + "github.com/ooni/minivpn/internal/model" "github.com/ooni/minivpn/internal/session" "github.com/ooni/minivpn/internal/workers" ) +var ( + serviceName = "packetmuxer" +) + // Service is the packetmuxer service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -92,13 +98,14 @@ type workersState struct { // moveUpWorker moves packets up the stack func (ws *workersState) moveUpWorker() { + workerName := fmt.Sprintf("%s: moveUpWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("packetmuxer: moveUpWorker: done") }() - ws.logger.Debug("packetmuxer: moveUpWorker: started") + ws.logger.Debugf("%s: started", workerName) for { // POSSIBLY BLOCK awaiting for incoming raw packet @@ -123,13 +130,14 @@ func (ws *workersState) moveUpWorker() { // moveDownWorker moves packets down the stack func (ws *workersState) moveDownWorker() { + workerName := fmt.Sprintf("%s: moveDownWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("packetmuxer: moveDownWorker: done") }() - ws.logger.Debug("packetmuxer: moveDownWorker: started") + ws.logger.Debugf("%s: started", workerName) for { // POSSIBLY BLOCK on reading the packet moving down the stack @@ -138,19 +146,16 @@ func (ws *workersState) moveDownWorker() { // serialize the packet rawPacket, err := packet.Bytes() if err != nil { - ws.logger.Warnf("packetmuxer: cannot serialize packet: %s", err.Error()) + ws.logger.Warnf("%s: cannot serialize packet: %s", workerName, err.Error()) continue } - // While this channel send could possibly block, the [ARCHITECTURE] is - // such that (1) the channel is buffered and (2) the channel sender should - // avoid blocking when inserting data into the channel. - // + // POSSIBLY BLOCK on writing the packet to the networkio layer. // [ARCHITECTURE]: https://github.com/ooni/minivpn/blob/main/ARCHITECTURE.md + select { case ws.muxerToNetwork <- rawPacket: - default: - // drop the packet if the buffer is full as documented above + // nothing case <-ws.workersManager.ShouldShutdown(): return } @@ -216,7 +221,7 @@ func (ws *workersState) handleRawPacket(rawPacket []byte) error { return nil } -// finishThreeWayHandshake responsds to the HARD_RESET_SERVER and finishes the handshake. +// finishThreeWayHandshake responds to the HARD_RESET_SERVER and finishes the handshake. func (ws *workersState) finishThreeWayHandshake(packet *model.Packet) error { // register the server's session (note: the PoV is the server's one) ws.sessionManager.SetRemoteSessionID(packet.LocalSessionID) @@ -244,14 +249,13 @@ func (ws *workersState) finishThreeWayHandshake(packet *model.Packet) error { // advance the state ws.sessionManager.SetNegotiationState(session.S_START) - // attempt to tell TLS we want to handshake + // attempt to tell TLS we want to handshake. + // This WILL BLOCK if the notifyTLS channel + // is Full, but we make sure we control that we don't pass spurious soft-reset packets while we're + // doing a handshake. select { case ws.notifyTLS <- &model.Notification{Flags: model.NotificationReset}: // nothing - - default: - // the architecture says this notification should be nonblocking - case <-ws.workersManager.ShouldShutdown(): return workers.ErrShutdown } @@ -259,7 +263,7 @@ func (ws *workersState) finishThreeWayHandshake(packet *model.Packet) error { return nil } -// serializeAndEmit was written because Ain Ghazal was very insistent about it. +// serializeAndEmit will write a serialized packet on the channel going down to the networkio layer. func (ws *workersState) serializeAndEmit(packet *model.Packet) error { // serialize it rawPacket, err := packet.Bytes() @@ -267,7 +271,7 @@ func (ws *workersState) serializeAndEmit(packet *model.Packet) error { return err } - // emit the packet + // emit the packet. Possibly BLOCK writing to the networkio layer. select { case ws.muxerToNetwork <- rawPacket: // nothing diff --git a/internal/reliabletransport/reliabletransport.go b/internal/reliabletransport/reliabletransport.go index 161754f4..faf4c9e4 100644 --- a/internal/reliabletransport/reliabletransport.go +++ b/internal/reliabletransport/reliabletransport.go @@ -3,12 +3,17 @@ package reliabletransport import ( "bytes" + "fmt" "github.com/ooni/minivpn/internal/model" "github.com/ooni/minivpn/internal/session" "github.com/ooni/minivpn/internal/workers" ) +var ( + serviceName = "reliabletransport" +) + // Service is the reliable service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -73,13 +78,14 @@ type workersState struct { // moveUpWorker moves packets up the stack func (ws *workersState) moveUpWorker() { + workerName := fmt.Sprintf("%s: moveUpWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("reliable: moveUpWorker: done") }() - ws.logger.Debug("reliable: moveUpWorker: started") + ws.logger.Debugf("%s: started", workerName) // TODO: do we need to have notifications from the control channel // to reset state or can we do this implicitly? @@ -100,7 +106,8 @@ func (ws *workersState) moveUpWorker() { // drop a packet that is not for our session if !bytes.Equal(packet.LocalSessionID[:], ws.sessionManager.RemoteSessionID()) { ws.logger.Warnf( - "reliable: moveUpWorker: packet with invalid RemoteSessionID: expected %x; got %x", + "%s: packet with invalid RemoteSessionID: expected %x; got %x", + workerName, ws.sessionManager.LocalSessionID(), packet.RemoteSessionID, ) @@ -109,7 +116,7 @@ func (ws *workersState) moveUpWorker() { // possibly ACK the incoming packet if err := ws.maybeACK(packet); err != nil { - ws.logger.Warnf("reliable: moveUpWorker: cannot ACK packet: %s", err.Error()) + ws.logger.Warnf("%s: cannot ACK packet: %s", workerName, err.Error()) continue } @@ -130,13 +137,14 @@ func (ws *workersState) moveUpWorker() { // moveDownWorker moves packets down the stack func (ws *workersState) moveDownWorker() { + workerName := fmt.Sprintf("%s: moveDownWorker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("reliable: moveDownWorker: done") }() - ws.logger.Debug("reliable: moveDownWorker: started") + ws.logger.Debugf("%s: started", workerName) // TODO: we should have timer for retransmission for { @@ -177,7 +185,7 @@ func (ws *workersState) maybeACK(packet *model.Packet) error { return err } - // move the packet down + // move the packet down. CAN BLOCK writing to the shared channel to muxer. select { case ws.dataOrControlToMuxer <- ACK: return nil diff --git a/internal/tlssession/tlssession.go b/internal/tlssession/tlssession.go index 12c6e4ae..a227ecd8 100644 --- a/internal/tlssession/tlssession.go +++ b/internal/tlssession/tlssession.go @@ -1,9 +1,8 @@ package tlssession import ( - "context" + "fmt" "net" - "time" "github.com/ooni/minivpn/internal/model" "github.com/ooni/minivpn/internal/session" @@ -11,6 +10,10 @@ import ( tls "github.com/refraction-networking/utls" ) +var ( + serviceName = "tlssession" +) + // Service is the tlssession service. Make sure you initialize // the channels before invoking [Service.StartWorkers]. type Service struct { @@ -71,19 +74,20 @@ type workersState struct { // worker is the main loop of the tlssession func (ws *workersState) worker() { + workerName := fmt.Sprintf("%s: worker", serviceName) + defer func() { - ws.workersManager.OnWorkerDone() + ws.workersManager.OnWorkerDone(workerName) ws.workersManager.StartShutdown() - ws.logger.Debug("tlssession: worker: done") }() - ws.logger.Debug("tlssession: worker: started") + ws.logger.Debugf("%s: started", workerName) for { select { case notif := <-ws.notifyTLS: if (notif.Flags & model.NotificationReset) != 0 { if err := ws.tlsAuth(); err != nil { - ws.logger.Warnf("tlssession: tlsAuth: %s", err.Error()) + ws.logger.Warnf("%s: %s", workerName, err.Error()) // TODO: is it worth checking the return value and stopping? } } @@ -116,17 +120,10 @@ func (ws *workersState) tlsAuth() error { errorch := make(chan error) go ws.doTLSAuth(conn, tlsConf, errorch) - // make sure we timeout after 60 seconds anyway - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - select { case err := <-errorch: return err - case <-ctx.Done(): - return ctx.Err() - case <-ws.workersManager.ShouldShutdown(): return workers.ErrShutdown } @@ -144,7 +141,9 @@ func (ws *workersState) doTLSAuth(conn net.Conn, config *tls.Config, errorch cha errorch <- err return } - //defer tlsConn.Close() // <- we don't care since the underlying conn is a tlsBio + // In case you're wondering why we don't need to close the conn: + // we don't care since the underlying conn is a tlsBio + // defer tlsConn.Close() // we need the active key to create the first control message activeKey, err := ws.sessionManager.ActiveKey() diff --git a/internal/tun/setup.go b/internal/tun/setup.go index b261ae42..9d79499a 100644 --- a/internal/tun/setup.go +++ b/internal/tun/setup.go @@ -28,22 +28,22 @@ func connectChannel[T any](signal chan T, slot **chan T) { func startWorkers(logger model.Logger, sessionManager *session.Manager, tunDevice *TUN, conn networkio.FramingConn, options *model.Options) *workers.Manager { // create a workers manager - workersManager := workers.NewManager() + workersManager := workers.NewManager(logger) // create the networkio service. nio := &networkio.Service{ - MuxerToNetwork: make(chan []byte, 1<<5), - NetworkToMuxer: nil, // ok + MuxerToNetwork: make(chan []byte), + NetworkToMuxer: nil, } // create the packetmuxer service. muxer := &packetmuxer.Service{ - MuxerToReliable: nil, // ok - MuxerToData: nil, // ok + MuxerToReliable: nil, + MuxerToData: nil, NotifyTLS: nil, HardReset: make(chan any, 1), DataOrControlToMuxer: make(chan *model.Packet), - MuxerToNetwork: nil, // ok + MuxerToNetwork: nil, NetworkToMuxer: make(chan []byte), } @@ -54,7 +54,7 @@ func startWorkers(logger model.Logger, sessionManager *session.Manager, // create the datachannel service. datach := &datachannel.Service{ MuxerToData: make(chan *model.Packet), - DataOrControlToMuxer: nil, // ok + DataOrControlToMuxer: nil, KeyReady: make(chan *session.DataChannelKey, 1), TUNToData: tunDevice.tunDown, DataToTUN: tunDevice.tunUp, @@ -66,10 +66,10 @@ func startWorkers(logger model.Logger, sessionManager *session.Manager, // create the reliabletransport service. rel := &reliabletransport.Service{ - DataOrControlToMuxer: nil, // ok + DataOrControlToMuxer: nil, ControlToReliable: make(chan *model.Packet), MuxerToReliable: make(chan *model.Packet), - ReliableToControl: nil, // ok + ReliableToControl: nil, } // connect reliable service and packetmuxer. @@ -78,11 +78,11 @@ func startWorkers(logger model.Logger, sessionManager *session.Manager, // create the controlchannel service. ctrl := &controlchannel.Service{ - NotifyTLS: nil, // ok - ControlToReliable: nil, // ok + NotifyTLS: nil, + ControlToReliable: nil, ReliableToControl: make(chan *model.Packet), TLSRecordToControl: make(chan []byte), - TLSRecordFromControl: nil, // ok + TLSRecordFromControl: nil, } // connect the reliable service and the controlchannel service diff --git a/internal/tun/tun.go b/internal/tun/tun.go index dad3c022..51448c1e 100644 --- a/internal/tun/tun.go +++ b/internal/tun/tun.go @@ -3,6 +3,7 @@ package tun import ( "bytes" "context" + "errors" "net" "os" "sync" @@ -14,6 +15,11 @@ import ( "github.com/ooni/minivpn/internal/session" ) +var ( + // default TLS handshake timeout, in seconds. + tlsHandshakeTimeoutSeconds = 60 +) + // StartTUN initializes and starts the TUN device over the vpn. // If the passed context expires before the TUN device is ready, func StartTUN(ctx context.Context, conn networkio.FramingConn, options *model.Options) (*TUN, error) { @@ -33,15 +39,26 @@ func StartTUN(ctx context.Context, conn networkio.FramingConn, options *model.Op workers.WaitWorkersShutdown() }) + tlsTimeout := time.NewTimer(time.Duration(tlsHandshakeTimeoutSeconds) * time.Second) + // Await for the signal from the session manager to tell us we're ready to start accepting data. // In practice, this means that we already have a valid TunnelInfo at this point // (i.e., three way handshake has completed, and we have valid keys). select { - case <-ctx.Done(): - return nil, ctx.Err() case <-sessionManager.Ready: return tunnel, nil + case <-tlsTimeout.C: + defer func() { + log.Log.Info("tls timeout") + tunnel.Close() + }() + return nil, errors.New("tls timeout") + case <-ctx.Done(): + defer func() { + tunnel.Close() + }() + return nil, ctx.Err() } } @@ -98,7 +115,7 @@ func newTUN(logger model.Logger, conn networkio.FramingConn, session *session.Ma readDeadline: makeTUNDeadline(), session: session, tunDown: make(chan []byte), - tunUp: make(chan []byte, 10), + tunUp: make(chan []byte), // this function is explicitely set empty so that we can safely use a callback even if not set. whenDoneFn: func() {}, writeDeadline: makeTUNDeadline(), diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 52fc4033..f047ae21 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -7,6 +7,8 @@ package workers import ( "errors" "sync" + + "github.com/ooni/minivpn/internal/model" ) // ErrShutdown is the error returned by a worker that is shutting down. @@ -15,6 +17,9 @@ var ErrShutdown = errors.New("worker is shutting down") // Manager coordinates the lifeycles of the workers implementing the OpenVPN // protocol. The zero value is invalid; use [NewManager]. type Manager struct { + // logger logs events + logger model.Logger + // shouldShutdown is closed to signal all workers to shut down. shouldShutdown chan any @@ -26,8 +31,9 @@ type Manager struct { } // NewManager creates a new [*Manager]. -func NewManager() *Manager { +func NewManager(logger model.Logger) *Manager { return &Manager{ + logger: logger, shouldShutdown: make(chan any), shutdownOnce: sync.Once{}, wg: &sync.WaitGroup{}, @@ -41,7 +47,8 @@ func (m *Manager) StartWorker(fx func()) { } // OnWorkerDone MUST be called when a worker goroutine terminates. -func (m *Manager) OnWorkerDone() { +func (m *Manager) OnWorkerDone(name string) { + m.logger.Debugf("%s: worker done", name) m.wg.Done() }