// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use std::{cell::RefCell, collections::VecDeque, rc::Rc}; use neqo_common::{header::HeadersExt as _, Header}; use neqo_transport::{AppError, StreamId}; use crate::{ connection::Http3State, features::extended_connect::{self, ExtendedConnectEvents, ExtendedConnectType}, CloseType, Http3StreamInfo, HttpRecvStreamEvents, Priority, RecvStreamEvents, Res, SendStreamEvents, }; /// Server events for a single connection. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Http3ServerConnEvent { /// Headers are ready. Headers { stream_info: Http3StreamInfo, headers: Vec
, fin: bool, }, PriorityUpdate { stream_id: StreamId, priority: Priority, }, /// Request data is ready. DataReadable { stream_info: Http3StreamInfo, }, DataWritable { stream_info: Http3StreamInfo, }, StreamReset { stream_info: Http3StreamInfo, error: AppError, }, StreamStopSending { stream_info: Http3StreamInfo, error: AppError, }, /// Connection state change. StateChange(Http3State), WebTransport(WebTransportEvent), ConnectUdp(ConnectUdpEvent), } #[derive(Debug, PartialEq, Eq, Clone)] pub enum WebTransportEvent { Session { stream_id: StreamId, headers: Vec
, }, SessionClosed { stream_id: StreamId, reason: extended_connect::session::CloseReason, headers: Option>, }, NewStream(Http3StreamInfo), Datagram { session_id: StreamId, datagram: Vec, }, } #[derive(Debug, PartialEq, Eq, Clone)] pub enum ConnectUdpEvent { Session { stream_id: StreamId, headers: Vec
, }, SessionClosed { stream_id: StreamId, reason: extended_connect::session::CloseReason, headers: Option>, }, Datagram { session_id: StreamId, datagram: Vec, }, } #[derive(Debug, Default, Clone)] pub struct Http3ServerConnEvents { events: Rc>>, } impl SendStreamEvents for Http3ServerConnEvents { fn send_closed(&self, stream_info: &Http3StreamInfo, close_type: CloseType) { if close_type != CloseType::Done { if let Some(error) = close_type.error() { self.insert(Http3ServerConnEvent::StreamStopSending { stream_info: *stream_info, error, }); } } } fn data_writable(&self, stream_info: &Http3StreamInfo) { self.insert(Http3ServerConnEvent::DataWritable { stream_info: *stream_info, }); } } impl RecvStreamEvents for Http3ServerConnEvents { /// Add a new `DataReadable` event fn data_readable(&self, stream_info: &Http3StreamInfo) { self.insert(Http3ServerConnEvent::DataReadable { stream_info: *stream_info, }); } fn recv_closed(&self, stream_info: &Http3StreamInfo, close_type: CloseType) { if close_type != CloseType::Done { self.remove_events_for_stream_id(stream_info); if let Some(error) = close_type.error() { self.insert(Http3ServerConnEvent::StreamReset { stream_info: *stream_info, error, }); } } } } impl HttpRecvStreamEvents for Http3ServerConnEvents { /// Add a new `HeaderReady` event. fn header_ready( &self, stream_info: &Http3StreamInfo, headers: Vec
, _interim: bool, fin: bool, ) { self.insert(Http3ServerConnEvent::Headers { stream_info: *stream_info, headers, fin, }); } fn extended_connect_new_session(&self, stream_id: StreamId, headers: Vec
) { match headers.find_header(":protocol").map(Header::value) { Some("webtransport") => { self.insert(Http3ServerConnEvent::WebTransport( WebTransportEvent::Session { stream_id, headers }, )); } Some("connect-udp") => { self.insert(Http3ServerConnEvent::ConnectUdp(ConnectUdpEvent::Session { stream_id, headers, })); } Some(_) => { unimplemented!("Extended connect other than webtransport or connect-udp") } None => { unimplemented!("connect without :protocol header"); } } } } impl ExtendedConnectEvents for Http3ServerConnEvents { fn session_start( &self, _connect_type: ExtendedConnectType, _stream_id: StreamId, _status: u16, _headers: Vec
, ) { } fn session_end( &self, connect_type: ExtendedConnectType, stream_id: StreamId, reason: extended_connect::session::CloseReason, headers: Option>, ) { let event = match connect_type { ExtendedConnectType::WebTransport => { Http3ServerConnEvent::WebTransport(WebTransportEvent::SessionClosed { stream_id, reason, headers, }) } ExtendedConnectType::ConnectUdp => { Http3ServerConnEvent::ConnectUdp(ConnectUdpEvent::SessionClosed { stream_id, reason, headers, }) } }; self.insert(event); } fn extended_connect_new_stream( &self, stream_info: Http3StreamInfo, emit_readable: bool, ) -> Res<()> { debug_assert!(!emit_readable, "only set by client"); self.insert(Http3ServerConnEvent::WebTransport( WebTransportEvent::NewStream(stream_info), )); Ok(()) } fn new_datagram( &self, session_id: StreamId, datagram: Vec, connect_type: ExtendedConnectType, ) { let event = match connect_type { ExtendedConnectType::WebTransport => { Http3ServerConnEvent::WebTransport(WebTransportEvent::Datagram { session_id, datagram, }) } ExtendedConnectType::ConnectUdp => { Http3ServerConnEvent::ConnectUdp(ConnectUdpEvent::Datagram { session_id, datagram, }) } }; self.insert(event); } } impl Http3ServerConnEvents { fn insert(&self, event: Http3ServerConnEvent) { self.events.borrow_mut().push_back(event); } fn remove(&self, f: F) where F: Fn(&Http3ServerConnEvent) -> bool, { self.events.borrow_mut().retain(|evt| !f(evt)); } pub fn has_events(&self) -> bool { !self.events.borrow().is_empty() } pub fn next_event(&self) -> Option { self.events.borrow_mut().pop_front() } pub fn connection_state_change(&self, state: Http3State) { self.insert(Http3ServerConnEvent::StateChange(state)); } pub fn priority_update(&self, stream_id: StreamId, priority: Priority) { self.insert(Http3ServerConnEvent::PriorityUpdate { stream_id, priority, }); } fn remove_events_for_stream_id(&self, stream_info: &Http3StreamInfo) { self.remove(|evt| { matches!(evt, Http3ServerConnEvent::Headers { stream_info: x, .. } | Http3ServerConnEvent::DataReadable { stream_info: x, .. } if x == stream_info) }); } }