// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. // Collecting a list of events relevant to whoever is using the Connection. use std::{cell::RefCell, collections::VecDeque, rc::Rc}; use neqo_common::event::Provider as EventProvider; use neqo_crypto::ResumptionToken; use crate::{ AppError, Stats, connection::State, quic_datagrams::DatagramTracking, stream_id::{StreamId, StreamType}, }; #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] pub enum OutgoingDatagramOutcome { DroppedTooBig, DroppedQueueFull, Lost, Acked, } #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] pub enum ConnectionEvent { /// Cert authentication needed AuthenticationNeeded, /// Encrypted client hello fallback occurred. The certificate for the /// public name needs to be authenticated. EchFallbackAuthenticationNeeded { public_name: String, }, /// A new uni (read) or bidi stream has been opened by the peer. NewStream { stream_id: StreamId, }, /// Space available in the buffer for an application write to succeed. SendStreamWritable { stream_id: StreamId, }, /// New bytes available for reading. RecvStreamReadable { stream_id: StreamId, }, /// Peer reset the stream. RecvStreamReset { stream_id: StreamId, app_error: AppError, }, /// Peer has sent `STOP_SENDING` SendStreamStopSending { stream_id: StreamId, app_error: AppError, }, /// Peer has acked everything sent on the stream. SendStreamComplete { stream_id: StreamId, }, /// Peer increased `MAX_STREAMS` SendStreamCreatable { stream_type: StreamType, }, /// Connection state change. StateChange(State), /// The server rejected 0-RTT. /// This event invalidates all state in streams that has been created. /// Any data written to streams needs to be written again. ZeroRttRejected, ResumptionToken(ResumptionToken), Datagram(Vec), OutgoingDatagramOutcome { id: u64, outcome: OutgoingDatagramOutcome, }, IncomingDatagramDropped, } #[derive(Debug, Default, Clone)] pub struct ConnectionEvents { events: Rc>>, } impl ConnectionEvents { pub fn authentication_needed(&self) { self.insert(ConnectionEvent::AuthenticationNeeded); } pub fn ech_fallback_authentication_needed(&self, public_name: String) { self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name }); } pub fn new_stream(&self, stream_id: StreamId) { self.insert(ConnectionEvent::NewStream { stream_id }); } pub fn recv_stream_readable(&self, stream_id: StreamId) { self.insert(ConnectionEvent::RecvStreamReadable { stream_id }); } pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) { // If reset, no longer readable. self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); self.insert(ConnectionEvent::RecvStreamReset { stream_id, app_error, }); } pub fn send_stream_writable(&self, stream_id: StreamId) { self.insert(ConnectionEvent::SendStreamWritable { stream_id }); } pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) { // If stopped, no longer writable. self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); self.insert(ConnectionEvent::SendStreamStopSending { stream_id, app_error, }); } pub fn send_stream_complete(&self, stream_id: StreamId) { self.remove(|evt| { matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } | ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id) }); self.insert(ConnectionEvent::SendStreamComplete { stream_id }); } pub fn send_stream_creatable(&self, stream_type: StreamType) { self.insert(ConnectionEvent::SendStreamCreatable { stream_type }); } pub fn connection_state_change(&self, state: State) { // If closing, existing events no longer relevant. match state { State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(), _ => (), } self.insert(ConnectionEvent::StateChange(state)); } pub fn client_resumption_token(&self, token: ResumptionToken) { self.insert(ConnectionEvent::ResumptionToken(token)); } pub fn client_0rtt_rejected(&self) { // If 0rtt rejected, must start over and existing events are no longer // relevant. self.events.borrow_mut().clear(); self.insert(ConnectionEvent::ZeroRttRejected); } pub fn recv_stream_complete(&self, stream_id: StreamId) { // If stopped, no longer readable. self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); } // The number of datagrams in the events queue is limited to max_queued_datagrams. // This function ensure this and deletes the oldest datagrams (head-drop) if needed. fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) { let mut queue = self.events.borrow_mut(); let count = queue .iter() .filter(|evt| matches!(evt, ConnectionEvent::Datagram(_))) .count(); if count < max_queued_datagrams { // Below the limit. No action needed. return; } let first = queue .iter_mut() .find(|evt| matches!(evt, ConnectionEvent::Datagram(_))) .expect("Checked above"); // Remove the oldest (head-drop), replacing it with an // IncomingDatagramDropped placeholder. *first = ConnectionEvent::IncomingDatagramDropped; stats.incoming_datagram_dropped += 1; } pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) { self.check_datagram_queued(max_queued_datagrams, stats); self.events .borrow_mut() .push_back(ConnectionEvent::Datagram(data.to_vec())); } pub fn datagram_outcome( &self, dgram_tracker: &DatagramTracking, outcome: OutgoingDatagramOutcome, ) { if let DatagramTracking::Id(id) = dgram_tracker { self.events .borrow_mut() .push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome }); } } fn insert(&self, event: ConnectionEvent) { let mut q = self.events.borrow_mut(); // Special-case two enums that are not strictly PartialEq equal but that // we wish to avoid inserting duplicates. let already_present = match &event { ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| { matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == *stream_id) }), ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| { matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. } if *x == *stream_id) }), _ => q.contains(&event), }; if !already_present { q.push_back(event); } } fn remove(&self, f: F) where F: Fn(&ConnectionEvent) -> bool, { self.events.borrow_mut().retain(|evt| !f(evt)); } } impl EventProvider for ConnectionEvents { type Event = ConnectionEvent; fn has_events(&self) -> bool { !self.events.borrow().is_empty() } fn next_event(&mut self) -> Option { self.events.borrow_mut().pop_front() } } #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod tests { use neqo_common::event::Provider as _; use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, Stats, StreamId}; #[test] fn event_culling() { let mut evts = ConnectionEvents::default(); assert!(!evts.has_events()); evts.client_0rtt_rejected(); assert!(evts.has_events()); evts.client_0rtt_rejected(); assert_eq!(evts.events().count(), 1); assert_eq!(evts.events().count(), 0); evts.new_stream(4.into()); evts.new_stream(4.into()); assert_eq!(evts.events().count(), 1); evts.recv_stream_readable(6.into()); evts.recv_stream_reset(6.into(), 66); evts.recv_stream_reset(6.into(), 65); assert_eq!(evts.events().count(), 1); evts.send_stream_writable(8.into()); evts.send_stream_writable(8.into()); evts.send_stream_stop_sending(8.into(), 55); evts.send_stream_stop_sending(8.into(), 56); let events = evts.events().collect::>(); assert_eq!(events.len(), 1); assert_eq!( events[0], ConnectionEvent::SendStreamStopSending { stream_id: StreamId::new(8), app_error: 55 } ); evts.send_stream_writable(8.into()); evts.send_stream_writable(8.into()); evts.send_stream_stop_sending(8.into(), 55); evts.send_stream_stop_sending(8.into(), 56); evts.send_stream_complete(8.into()); assert_eq!(evts.events().count(), 1); evts.send_stream_writable(8.into()); evts.send_stream_writable(9.into()); evts.send_stream_stop_sending(10.into(), 55); evts.send_stream_stop_sending(11.into(), 56); evts.send_stream_complete(12.into()); assert_eq!(evts.events().count(), 5); evts.send_stream_writable(8.into()); evts.send_stream_writable(9.into()); evts.send_stream_stop_sending(10.into(), 55); evts.send_stream_stop_sending(11.into(), 56); evts.send_stream_complete(12.into()); evts.client_0rtt_rejected(); assert_eq!(evts.events().count(), 1); evts.send_stream_writable(9.into()); evts.send_stream_stop_sending(10.into(), 55); evts.connection_state_change(State::Closed(CloseReason::Transport(Error::StreamState))); assert_eq!(evts.events().count(), 1); } #[test] fn datagram_queue_drops_oldest() { const MAX_QUEUED: usize = 2; // Fill the queue to capacity, verify that and that there are no drops yet. let e = ConnectionEvents::default(); let mut stats = Stats::default(); e.add_datagram(MAX_QUEUED, &[1], &mut stats); e.add_datagram(MAX_QUEUED, &[2], &mut stats); assert_eq!(stats.incoming_datagram_dropped, 0); assert_eq!(e.events.borrow().len(), MAX_QUEUED); // Add one more datagram - this should drop the oldest ("1"). e.add_datagram(MAX_QUEUED, &[3], &mut stats); assert_eq!(stats.incoming_datagram_dropped, 1); // Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams. assert_eq!( e.events.borrow().iter().collect::>(), [ &ConnectionEvent::IncomingDatagramDropped, &ConnectionEvent::Datagram(vec![2]), &ConnectionEvent::Datagram(vec![3]), ] ); } /// Previously `check_datagram_queued` had a bug that caused it to /// potentially drop an unrelated event. /// /// See for details. #[test] fn datagram_queue_drops_datagram_not_unrelated_event() { const MAX_QUEUED: usize = 2; let e = ConnectionEvents::default(); let mut stats = Stats::default(); // Add unrelated event. e.new_stream(4.into()); // Fill the queue with datagrams to capacity. e.add_datagram(MAX_QUEUED, &[1], &mut stats); e.add_datagram(MAX_QUEUED, &[2], &mut stats); assert_eq!(stats.incoming_datagram_dropped, 0); assert_eq!(e.events.borrow().len(), 1 + MAX_QUEUED); // Add one more datagram - this should drop the oldest ("1"), not the // unrelated event. e.add_datagram(MAX_QUEUED, &[3], &mut stats); assert_eq!(stats.incoming_datagram_dropped, 1); // Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams. assert_eq!( e.events.borrow().iter().collect::>(), [ &ConnectionEvent::NewStream { stream_id: StreamId::new(4) }, &ConnectionEvent::IncomingDatagramDropped, &ConnectionEvent::Datagram(vec![2]), &ConnectionEvent::Datagram(vec![3]), ] ); } }