diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index b36ca0c..a058ed2 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -427,7 +427,7 @@ def _on_receive_sframe(frame): # Our I-frame handler should have been called assert len(iframes) == 1 assert isinstance(iframes[0], AX258BitInformationFrame) - assert iframes[0].pid == 0xf0 + assert iframes[0].pid == 0xF0 assert iframes[0].payload == b"Testing 1 2 3 4" # Our S-frame handler should NOT have been called @@ -482,7 +482,7 @@ def _on_receive_sframe(frame): # Our I-frame handler should have been called assert len(iframes) == 1 assert isinstance(iframes[0], AX2516BitInformationFrame) - assert iframes[0].pid == 0xf0 + assert iframes[0].pid == 0xF0 assert iframes[0].payload == b"Testing 1 2 3 4" # Our S-frame handler should NOT have been called @@ -595,6 +595,517 @@ def _on_receive_sframe(frame): assert iframes == [] +def test_recv_iframe_busy(): + """ + Test that an I-frame received while we're busy triggers RNR. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _send_rnr_notification and _cancel_rr_notification + count = dict(send_rnr=0, cancel_rr=0) + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + peer._local_busy = True + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be cancelled and there should be a RNR queued + assert count == dict(cancel_rr=1, send_rnr=1) + + +def test_recv_iframe_mismatched_seq(): + """ + Test that an I-frame with a mismatched sequence number is dropped. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub the functions called + count = dict(send_rnr=0, cancel_rr=0, send_next_iframe=0, schedule_rr=0) + isframes = [] + iframes = [] + state_updates = [] + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _schedule_rr_notification(): + count["schedule_rr"] += 1 + + peer._schedule_rr_notification = _schedule_rr_notification + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + def _on_receive_isframe_nr_ns(frame): + isframes.append(frame) + + peer._on_receive_isframe_nr_ns = _on_receive_isframe_nr_ns + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + # Hook received_information signal + + def _received_information(frame, payload, **kwargs): + assert kwargs == {} + assert payload == frame.payload + iframes.append(frame) + + peer.received_information.connect(_received_information) + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be cancelled, no other actions pending + assert count == dict( + cancel_rr=1, send_rnr=0, schedule_rr=0, send_next_iframe=0 + ) + + assert isframes == [] + assert iframes == [] + assert state_updates == [] + + +def test_recv_iframe_mismatched_seq(): + """ + Test that an I-frame with a mismatched sequence number is dropped. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub the functions called + count = dict(send_rnr=0, cancel_rr=0, send_next_iframe=0, schedule_rr=0) + isframes = [] + iframes = [] + state_updates = [] + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _schedule_rr_notification(): + count["schedule_rr"] += 1 + + peer._schedule_rr_notification = _schedule_rr_notification + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + def _on_receive_isframe_nr_ns(frame): + isframes.append(frame) + + peer._on_receive_isframe_nr_ns = _on_receive_isframe_nr_ns + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + # Hook received_information signal + + def _received_information(frame, payload, **kwargs): + assert kwargs == {} + assert payload == frame.payload + iframes.append(frame) + + peer.received_information.connect(_received_information) + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be cancelled, no other actions pending + assert count == dict( + cancel_rr=1, send_rnr=0, schedule_rr=0, send_next_iframe=0 + ) + + assert isframes == [] + assert iframes == [] + assert state_updates == [] + + +def test_recv_iframe_matched_seq_nopending(): + """ + Test that an I-frame with a matched sequence number is handled. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub the functions called + count = dict(send_rnr=0, cancel_rr=0, send_next_iframe=0, schedule_rr=0) + isframes = [] + iframes = [] + state_updates = [] + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _schedule_rr_notification(): + count["schedule_rr"] += 1 + + peer._schedule_rr_notification = _schedule_rr_notification + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + def _on_receive_isframe_nr_ns(frame): + isframes.append(frame) + + peer._on_receive_isframe_nr_ns = _on_receive_isframe_nr_ns + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + # Hook received_information signal + + def _received_information(frame, payload, **kwargs): + assert kwargs == {} + assert payload == frame.payload + iframes.append(frame) + + peer.received_information.connect(_received_information) + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + peer._recv_seq = 2 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be re-scheduled, no I-frame transmissions + assert count == dict( + cancel_rr=1, send_rnr=0, schedule_rr=1, send_next_iframe=0 + ) + + assert len(isframes) == 1 + + frame = isframes.pop(0) + assert frame.pid == 0xF0 + assert frame.payload == b"Testing 1 2 3 4" + + assert iframes == [frame] + assert state_updates == [ + {"comment": "from I-frame N(S)", "prop": "_recv_state", "value": 3} + ] + + +def test_recv_iframe_matched_seq_lotspending(): + """ + Test that an I-frame with lots of pending I-frames sends RR instead. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub the functions called + count = dict(send_rnr=0, cancel_rr=0, send_next_iframe=0, schedule_rr=0) + isframes = [] + iframes = [] + state_updates = [] + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _schedule_rr_notification(): + count["schedule_rr"] += 1 + + peer._schedule_rr_notification = _schedule_rr_notification + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + def _on_receive_isframe_nr_ns(frame): + isframes.append(frame) + + peer._on_receive_isframe_nr_ns = _on_receive_isframe_nr_ns + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + # Hook received_information signal + + def _received_information(frame, payload, **kwargs): + assert kwargs == {} + assert payload == frame.payload + iframes.append(frame) + + peer.received_information.connect(_received_information) + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + peer._max_outstanding = 8 + peer._recv_seq = 2 + peer._pending_data = [(0xF0, b"Test outgoing")] + peer._pending_iframes = { + 0: (0xF0, b"Test outgoing 1"), + 1: (0xF0, b"Test outgoing 2"), + 2: (0xF0, b"Test outgoing 3"), + 3: (0xF0, b"Test outgoing 4"), + 4: (0xF0, b"Test outgoing 5"), + 5: (0xF0, b"Test outgoing 6"), + 6: (0xF0, b"Test outgoing 7"), + 7: (0xF0, b"Test outgoing 8"), + } + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be re-scheduled, no I-frame transmissions + assert count == dict( + cancel_rr=1, send_rnr=0, schedule_rr=1, send_next_iframe=0 + ) + + assert len(isframes) == 1 + + frame = isframes.pop(0) + assert frame.pid == 0xF0 + assert frame.payload == b"Testing 1 2 3 4" + + assert iframes == [frame] + assert state_updates == [ + {"comment": "from I-frame N(S)", "prop": "_recv_state", "value": 3} + ] + + +def test_recv_iframe_matched_seq_iframepending(): + """ + Test that an I-frame reception triggers I-frame transmission if data is + pending. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub the functions called + count = dict(send_rnr=0, cancel_rr=0, send_next_iframe=0, schedule_rr=0) + isframes = [] + iframes = [] + state_updates = [] + + def _cancel_rr_notification(): + count["cancel_rr"] += 1 + + peer._cancel_rr_notification = _cancel_rr_notification + + def _schedule_rr_notification(): + count["schedule_rr"] += 1 + + peer._schedule_rr_notification = _schedule_rr_notification + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _send_rnr_notification(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr_notification + + def _on_receive_isframe_nr_ns(frame): + isframes.append(frame) + + peer._on_receive_isframe_nr_ns = _on_receive_isframe_nr_ns + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + # Hook received_information signal + + def _received_information(frame, payload, **kwargs): + assert kwargs == {} + assert payload == frame.payload + iframes.append(frame) + + peer.received_information.connect(_received_information) + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + peer._max_outstanding = 8 + peer._recv_seq = 2 + peer._pending_data = [(0xF0, b"Test outgoing")] + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # RR notification should be cancelled, no I-frame transmissions + assert count == dict( + cancel_rr=1, send_rnr=0, schedule_rr=0, send_next_iframe=1 + ) + + assert len(isframes) == 1 + + frame = isframes.pop(0) + assert frame.pid == 0xF0 + assert frame.payload == b"Testing 1 2 3 4" + + assert iframes == [frame] + assert state_updates == [ + {"comment": "from I-frame N(S)", "prop": "_recv_state", "value": 3} + ] + + def test_recv_disc(): """ Test that DISC is handled.