// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use std::{ cell::RefCell, fmt::{self, Debug, Display, Formatter}, mem, rc::Rc, time::Instant, }; use neqo_common::{ Bytes, Decoder, Header, MessageType, Role, qdebug, qerror, qinfo, qtrace, qwarn, }; use neqo_qpack as qpack; use neqo_transport::{ AppError, CloseReason, Connection, DatagramTracking, State, StreamId, StreamType, ZeroRttState, streams::SendOrder, }; use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; use strum::Display; use crate::{ CloseType, Error, Http3Parameters, Http3StreamType, HttpRecvStreamEvents, NewStreamType, Priority, PriorityHandler, ReceiveOutput, RecvStream, RecvStreamEvents, Res, SendStream, SendStreamEvents, client_events::Http3ClientEvents, control_stream_local::ControlStreamLocal, control_stream_remote::ControlStreamRemote, features::{ ConnectType, extended_connect::{ self, ExtendedConnectEvents, ExtendedConnectFeature, ExtendedConnectType, webtransport_streams::{WebTransportRecvStream, WebTransportSendStream}, }, }, frames::HFrame, push_controller::PushController, qpack_decoder_receiver::DecoderRecvStream, qpack_encoder_receiver::EncoderRecvStream, recv_message::{RecvMessage, RecvMessageInfo}, request_target::RequestTarget, send_message::SendMessage, settings::{HSettingType, HSettings, HttpZeroRttChecker}, stream_type_reader::NewStreamHeadReader, }; pub struct RequestDescription<'b, T: RequestTarget> { pub method: &'b str, pub connect_type: Option, pub target: T, pub headers: &'b [Header], pub priority: Priority, } /// Possible actions on an HTTP Extended CONNECT session request. #[derive(Display)] pub enum SessionAcceptAction { Accept, Reject(Vec
), } #[derive(Debug)] enum Http3RemoteSettingsState { NotReceived, Received(HSettings), ZeroRtt(HSettings), } /// States: /// - `Initializing`: this is the state during the QUIC handshake, /// - `ZeroRtt`: 0-RTT has been enabled and is active /// - Connected /// - GoingAway(StreamId): The connection has received a `GOAWAY` frame /// - Closing(CloseReason): The connection is closed. The closing has been initiated by this end of /// the connection, e.g., the `CONNECTION_CLOSE` frame has been sent. In this state, the /// connection waits a certain amount of time to retransmit the `CONNECTION_CLOSE` frame if /// needed. /// - Closed(CloseReason): This is the final close state: closing has been initialized by the peer /// and an ack for the `CONNECTION_CLOSE` frame has been sent or the closing has been initiated by /// this end of the connection and the ack for the `CONNECTION_CLOSE` has been received or the /// waiting time has passed. #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)] pub enum Http3State { Initializing, ZeroRtt, Connected, GoingAway(StreamId), Closing(CloseReason), Closed(CloseReason), } impl Http3State { #[must_use] pub const fn active(&self) -> bool { matches!(self, Self::Connected | Self::GoingAway(_) | Self::ZeroRtt) } } /// # HTTP/3 core implementation /// /// This is the core implementation of HTTP/3 protocol. It implements most of the /// features of the protocol. [`crate::Http3Client`] and /// [`crate::connection_server::Http3ServerHandler`] implement only client and /// server side behavior. /// /// ## Streams /// /// Each [`Http3Connection`] holds a list of stream handlers. Each send and receive-handler is /// registered in `send_streams` and `recv_streams`. Unidirectional streams are registered only on /// one of the lists and bidirectional streams are registered in both lists and the 2 handlers are /// independent, e.g. one can be closed and removed and second may still be active. /// /// The only streams that are not registered are the local control stream, local /// QPACK decoder stream, and local QPACK encoder stream. These streams are /// send-streams and sending data on this stream is handled a bit differently. This /// is done in the [`Http3Connection::process_sending`] function, i.e. the control data /// is sent first and QPACK data is sent after regular stream data is sent because /// this stream may have new data only after regular streams are handled (TODO we /// may improve this a bit to send QPACK commands before headers.) /// /// There are the following types of streams: /// - [`Http3StreamType::Control`]: there is only a receiver stream of this type and the handler is /// [`ControlStreamRemote`]. /// - [`Http3StreamType::Decoder`]: there is only a receiver stream of this type and the handler is /// [`DecoderRecvStream`]. /// - [`Http3StreamType::Encoder`]: there is only a receiver stream of this type and the handler is /// [`EncoderRecvStream`]. /// - [`Http3StreamType::NewStream`]: there is only a receiver stream of this type and the handler /// is [`NewStreamHeadReader`]. /// - [`Http3StreamType::Http`]: [`SendMessage`] and [`RecvMessage`] handlers are responsible for /// this type of streams. /// - [`Http3StreamType::Push`]: [`RecvMessage`] is responsible for this type of streams. /// - [`Http3StreamType::ExtendedConnect`]: [`extended_connect::session::Session`] is responsible /// sender and receiver handler. /// - [`Http3StreamType::WebTransport`]: [`WebTransportSendStream`] and [`WebTransportRecvStream`] /// are responsible sender and receiver handler. /// - [`Http3StreamType::Unknown`]: These are all other stream types that are not unknown to the /// current implementation and should be handled properly by the spec, e.g., in our implementation /// the streams are reset. /// /// The streams are registered in `send_streams` and `recv_streams` in following ways depending if /// they are local or remote: /// - local streams: /// - all local stream will be registered with the appropriate handler. /// - remote streams: /// - all new incoming streams are registered with [`NewStreamHeadReader`]. This is triggered by /// [`ConnectionEvent::NewStream`] and [`Http3Connection::add_new_stream`] is called. /// - reading from a [`NewStreamHeadReader`] stream, via the [`RecvStream::receive`] function, /// will decode a stream type. [`RecvStream::receive`] will return [`ReceiveOutput::NewStream`] /// when a stream type has been decoded. After this point the stream: /// - will be regegistered with the appropriate handler, /// - will be canceled if is an unknown stream type or /// - the connection will fail if it is unallowed stream type (receiving HTTP request on the /// client-side). /// /// The output is handled in [`Http3Connection::handle_new_stream`], for control, qpack streams and /// partially `WebTransport` streams, otherwise the output is handled by [`Http3Client`] and /// [`Http3ServerHandler`]. /// /// /// ### Receiving data /// /// Reading from a stream is triggered by [`ConnectionEvent::RecvStreamReadable`] events for the /// stream. The receive handler is retrieved from `recv_streams` and its [`RecvStream::receive`] /// function is called. /// /// Receiving data on [`Http3StreamType::Http`] streams is also triggered by the /// [`Http3Connection::read_data`] function. [`ConnectionEvent::RecvStreamReadable`] events will /// trigger reading `HEADERS` frame and frame headers for `DATA` frames which will produce /// [`Http3ClientEvent`] or [`Http3ServerEvent`] events. The content of `DATA` frames is read by the /// application using the `read_data` function. The `read_data` function may read frame headers for /// consecutive `DATA` frames. /// /// On a [`Http3StreamType::WebTransport`] stream data will be read only by the /// `Http3Connection::read_data` function. The [`RecvStream::receive`] function only produces an /// [`Http3ClientEvent`] or [`Http3ServerEvent`] event. /// /// The [`RecvStream::receive`] and [`Http3Connection::read_data`] functions may detect that the /// stream is done, e.g. FIN received. In this case, the stream will be removed from the /// `recv_stream` register, see [`Http3Connection::remove_recv_stream`]. /// /// ### Sending data /// /// All sender stream handlers have buffers. Data is first written into a buffer before being /// supplied to the QUIC layer. All data except the `DATA` frame and `WebTransport(_)`’s payload are /// written into the buffer. This includes stream type byte, e.g. `WEBTRANSPORT_STREAM` as well. In /// the case of `Http` and `WebTransport(_)` applications can write directly to the QUIC layer using /// the `send_data` function to avoid copying data. Sending data via the `send_data` function is /// only possible if there is no buffered data. /// /// If a stream has buffered data it will be registered in the `streams_with_pending_data` queue and /// actual sending will be performed in the [`Http3Connection::process_sending`] function call. /// (This is done in this way, i.e. data is buffered first and then sent, for 2 reasons: in this /// way, sending will happen in a single function, therefore error handling and clean up is easier /// and the QUIC layer may not be able to accept all data and being able to buffer data is required /// in any case.) /// /// The `send` and `send_data` functions may detect that the stream is closed and all outstanding /// data has been transferred to the QUIC layer. In this case, the stream will be removed from the /// `send_stream` register. /// /// ### [`ControlStreamRemote`] /// /// The [`ControlStreamRemote`] handler uses [`FrameReader`] to read and decode frames received on /// the control frame. The [`RecvStream::receive`] implementation returns /// [`ReceiveOutput::ControlFrames`] with a list of control frames read (the list may be empty). The /// control frames are handled by [`Http3Connection`] and/or by [`Http3Client`] and /// [`Http3ServerHandler`]. /// /// ### [`DecoderRecvStream`] and [`EncoderRecvStream`] /// /// The [`RecvStream::receive`] implementation of these handlers call corresponding /// [`RecvStream::receive`] functions of [`qpack::Encoder`] and [`qpack::Decoder`]. /// /// [`DecoderRecvStream`] returns [`ReceiveOutput::UnblockedStreams`] that may contain a list of /// stream ids that are unblocked by receiving qpack decoder commands. [`Http3Connection`] will /// handle this output by calling [`RecvStream::receive`] for the listed stream ids. /// /// [`EncoderRecvStream`] only returns [`ReceiveOutput::NoOutput`]. /// /// Both handlers may return an error that will close the connection. /// /// ### [`NewStreamHeadReader`] /// /// A new incoming receiver stream registers a [`NewStreamHeadReader`] handler. This handler reads /// the first bytes of a stream to detect a stream type. The [`RecvStream::receive`] function /// returns [`ReceiveOutput::NoOutput`] if a stream type is still not known by reading the available /// stream data or [`ReceiveOutput::NewStream`]. The handling of the output is explained above. /// /// ### [`SendMessage`] and [`RecvMessage`] /// /// [`RecvMessage::receive`] only returns [`ReceiveOutput::NoOutput`]. It also have an event /// listener of type [`HttpRecvStreamEvents`]. The listener is called when headers are ready, or /// data is ready, etc. /// /// For example for [`Http3StreamType::Http`] stream the listener will produce /// [`Http3ClientEvent::HeaderReady`] and [`Http3ClientEvent::DataReadable`] events. /// /// ### [`extended_connect::session::Session`] /// /// An [`extended_connect::session::Session`] is connected to a control stream /// that is in essence an HTTP transaction. Therefore, /// [`extended_connect::session::Session`] will internally use a [`SendMessage`] /// and [`RecvMessage`] handler to handle parsing and sending of HTTP part of /// the control stream. When HTTP headers are exchanged, /// [`extended_connect::session::Session`] will take over handling of stream /// data. [`extended_connect::session::Session`] sets a [`HttpRecvStreamEvents`] /// listener as the [`RecvMessage`] event listener. /// /// `neqo_http3` implements the WebTransport and MASQUE connect-udp HTTP /// Extended CONNECT protocol using [`extended_connect::session::Session`]. /// /// The WebTransport HTTP Extended CONNECT protocol supports streams. /// [`WebTransportSendStream`] and [`WebTransportRecvStream`] are associated /// with a [`extended_connect::session::Session`] and they will be canceled if /// the session is closed. To be able to do this /// [`extended_connect::session::Session`] holds a list of its active streams /// and clean up is done in `remove_extended_connect`. /// /// ### [`WebTransportSendStream`] and [`WebTransportRecvStream`] /// /// WebTransport streams are associated with a session. [`WebTransportSendStream`] and /// [`WebTransportRecvStream`] hold a reference to the session and are registered in the session /// upon creation by [`Http3Connection`]. The [`WebTransportSendStream`] and /// [`WebTransportRecvStream`] handlers will be unregistered from the session if they are closed, /// reset, or canceled. /// /// The call to function [`RecvStream::receive`] may produce [`Http3ClientEvent::DataReadable`]. /// Actual reading of data is done in the `read_data` function. /// /// [`Http3ServerEvent`]: crate::Http3ServerEvent /// [`Http3Server`]: crate::Http3Server /// [`FrameReader`]: crate::frames::FrameReader /// [`Http3ClientEvent`]: crate::Http3ClientEvent /// [`Http3ClientEvent::DataReadable`]: crate::Http3ClientEvent::DataReadable /// [`Http3ClientEvent::HeaderReady`]: crate::Http3ClientEvent::HeaderReady /// [`Http3Client`]: crate::connection_client::Http3Client /// [`Http3ServerEvent::DataReadable`]: crate::Http3ServerEvent /// [`Http3ServerHandler`]: crate::connection_server::Http3ServerHandler /// [`ConnectionEvent::RecvStreamReadable`]: neqo_transport::ConnectionEvent::RecvStreamReadable /// [`ConnectionEvent::NewStream`]: neqo_transport::ConnectionEvent::NewStream #[derive(Debug)] pub struct Http3Connection { role: Role, state: Http3State, local_params: Http3Parameters, control_stream_local: ControlStreamLocal, qpack_encoder: Rc>, qpack_decoder: Rc>, settings_state: Http3RemoteSettingsState, streams_with_pending_data: HashSet, send_streams: HashMap>, recv_streams: HashMap>, webtransport: ExtendedConnectFeature, connect_udp: ExtendedConnectFeature, } impl Display for Http3Connection { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "Http3 connection") } } impl Http3Connection { /// Create a new connection. pub fn new(conn_params: Http3Parameters, role: Role) -> Self { Self { state: Http3State::Initializing, control_stream_local: ControlStreamLocal::default(), qpack_encoder: Rc::new(RefCell::new(qpack::Encoder::new( conn_params.get_qpack_settings(), true, ))), qpack_decoder: Rc::new(RefCell::new(qpack::Decoder::new( conn_params.get_qpack_settings(), ))), webtransport: ExtendedConnectFeature::new( ExtendedConnectType::WebTransport, conn_params.get_webtransport(), ), connect_udp: ExtendedConnectFeature::new( ExtendedConnectType::ConnectUdp, conn_params.get_connect(), ), local_params: conn_params, settings_state: Http3RemoteSettingsState::NotReceived, streams_with_pending_data: HashSet::default(), send_streams: HashMap::default(), recv_streams: HashMap::default(), role, } } /// Listener for non-default feature negotiation. No-op when feature is /// disabled. This is currently only used for the /// [`crate::features::extended_connect::webtransport_session`] and /// [`crate::features::extended_connect::connect_udp_session`] feature. The /// negotiation is done via the `SETTINGS` frame and when the peer's /// `SETTINGS` frame has been received the listener will be called. pub(crate) fn set_features_listener(&mut self, feature_listener: Http3ClientEvents) { self.webtransport.set_listener(feature_listener.clone()); self.connect_udp.set_listener(feature_listener); } /// This function creates and initializes, i.e. send stream type, the control and qpack /// streams. fn initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()> { qdebug!("[{self}] Initialize the http3 connection"); self.control_stream_local.create(conn)?; self.send_settings(); self.create_qpack_streams(conn)?; Ok(()) } fn send_settings(&mut self) { qdebug!("[{self}] Send settings"); self.control_stream_local.queue_frame(&HFrame::Settings { settings: HSettings::from(&self.local_params), }); self.control_stream_local.queue_frame(&HFrame::Grease); } /// Save settings for adding to the session ticket. pub(crate) fn save_settings(&self) -> Vec { HttpZeroRttChecker::save(&self.local_params) } fn create_qpack_streams(&self, conn: &mut Connection) -> Res<()> { qdebug!("[{self}] create_qpack_streams"); self.qpack_encoder .borrow_mut() .add_send_stream(conn.stream_create(StreamType::UniDi)?); self.qpack_decoder .borrow_mut() .add_send_stream(conn.stream_create(StreamType::UniDi)?); Ok(()) } /// Inform an [`Http3Connection`] that a stream has data to send and that /// [`SendStream::send`] should be called for the stream. pub(crate) fn stream_has_pending_data(&mut self, stream_id: StreamId) { self.streams_with_pending_data.insert(stream_id); } /// Return true if there is a stream that needs to send data. pub(crate) fn has_data_to_send(&self) -> bool { !self.streams_with_pending_data.is_empty() } /// This function calls the `send` function for all streams that have data to send. If a stream /// has data to send it will be added to the `streams_with_pending_data` list. /// /// Control and QPACK streams are handled differently and are never added to the list. fn send_non_control_streams(&mut self, conn: &mut Connection, now: Instant) -> Res<()> { let to_send = mem::take(&mut self.streams_with_pending_data); #[expect( clippy::iter_over_hash_type, reason = "OK to loop over active streams in an undefined order." )] for stream_id in to_send { let done = if let Some(s) = &mut self.send_streams.get_mut(&stream_id) { s.send(conn, now)?; if s.has_data_to_send() { self.streams_with_pending_data.insert(stream_id); } s.done() } else { false }; if done { self.remove_send_stream(stream_id, conn); } } Ok(()) } /// Call `send` for all streams that need to send data. See explanation for the main structure /// for more details. pub(crate) fn process_sending(&mut self, conn: &mut Connection, now: Instant) -> Res<()> { // check if control stream has data to send. self.control_stream_local .send(conn, &mut self.recv_streams, now)?; self.send_non_control_streams(conn, now)?; self.qpack_decoder.borrow_mut().send(conn)?; match self.qpack_encoder.borrow_mut().send_encoder_updates(conn) { Ok(()) | Err(neqo_qpack::Error::EncoderStreamBlocked | neqo_qpack::Error::DynamicTableFull) => { } Err(e) => return Err(Error::Qpack(e)), } Ok(()) } /// We have a resumption token which remembers previous settings. Update the setting. pub(crate) fn set_0rtt_settings( &mut self, conn: &mut Connection, settings: HSettings, ) -> Res<()> { self.initialize_http3_connection(conn)?; self.set_qpack_settings(&settings)?; self.settings_state = Http3RemoteSettingsState::ZeroRtt(settings); self.state = Http3State::ZeroRtt; Ok(()) } /// Returns the settings for a connection. This is used for creating a resumption token. pub(crate) fn get_settings(&self) -> Option { if let Http3RemoteSettingsState::Received(settings) = &self.settings_state { Some(settings.clone()) } else { None } } /// This is called when a [`neqo_transport::ConnectionEvent::NewStream`] /// event is received. This registers the stream with a /// [`NewStreamHeadReader`] handler. pub(crate) fn add_new_stream(&mut self, stream_id: StreamId) { qtrace!("[{self}] A new stream: {stream_id}"); self.recv_streams.insert( stream_id, Box::new(NewStreamHeadReader::new(stream_id, self.role)), ); } /// The function calls [`RecvStream::receive`] for a stream. It also deals /// with the outcome of a read by calling /// [`Http3Connection::handle_stream_manipulation_output`]. fn stream_receive( &mut self, conn: &mut Connection, stream_id: StreamId, now: Instant, ) -> Res { qtrace!("[{self}] Readable stream {stream_id}"); if let Some(recv_stream) = self.recv_streams.get_mut(&stream_id) { let res = recv_stream.receive(conn, now); return self .handle_stream_manipulation_output(res, stream_id, conn) .map(|(output, _)| output); } Ok(ReceiveOutput::NoOutput) } fn handle_unblocked_streams( &mut self, unblocked_streams: Vec, conn: &mut Connection, now: Instant, ) -> Res<()> { for stream_id in unblocked_streams { qdebug!("[{self}] Stream {stream_id} is unblocked"); if let Some(r) = self.recv_streams.get_mut(&stream_id) { let res = r .http_stream() .ok_or(Error::HttpInternal(10))? .header_unblocked(conn, now); let res = self.handle_stream_manipulation_output(res, stream_id, conn)?; debug_assert!(matches!(res, (ReceiveOutput::NoOutput, _))); } } Ok(()) } /// This function handles reading from all streams, i.e. control, qpack, request/response /// stream and unidi stream that still do not have a type. /// The function cannot handle: /// 1) a `Push(_)`, `Http` or `WebTransportStream(_)` stream /// 2) frames `MaxPushId`, `PriorityUpdateRequest`, `PriorityUpdateRequestPush` or `Goaway` must /// be handled by `Http3Client`/`Server`. /// /// The function returns `ReceiveOutput`. pub(crate) fn handle_stream_readable( &mut self, conn: &mut Connection, stream_id: StreamId, now: Instant, ) -> Res { let mut output = self.stream_receive(conn, stream_id, now)?; if let ReceiveOutput::NewStream(stream_type) = output { output = self.handle_new_stream(conn, stream_type, stream_id, now)?; } match output { ReceiveOutput::UnblockedStreams(unblocked_streams) => { self.handle_unblocked_streams(unblocked_streams, conn, now)?; Ok(ReceiveOutput::NoOutput) } ReceiveOutput::ControlFrames(control_frames) => { let mut rest = Vec::new(); for cf in control_frames { if let Some(not_handled) = self.handle_control_frame(cf)? { rest.push(not_handled); } } Ok(ReceiveOutput::ControlFrames(rest)) } ReceiveOutput::NewStream( NewStreamType::Push(_) | NewStreamType::Http(_) | NewStreamType::WebTransportStream(_), ) | ReceiveOutput::NoOutput => Ok(output), ReceiveOutput::NewStream(_) => { unreachable!("NewStream should have been handled already") } } } /// This is called when a RESET frame has been received. pub(crate) fn handle_stream_reset( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()> { qinfo!("[{self}] Handle a stream reset stream_id={stream_id} app_err={app_error}"); self.close_recv(stream_id, CloseType::ResetRemote(app_error), conn) } pub(crate) fn handle_stream_stop_sending( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()> { qinfo!("[{self}] Handle stream_stop_sending stream_id={stream_id} app_err={app_error}"); if self.send_stream_is_critical(stream_id) { return Err(Error::HttpClosedCriticalStream); } self.close_send(stream_id, CloseType::ResetRemote(app_error), conn); Ok(()) } /// This is called when `neqo_transport::Connection` state has been change to take proper /// actions in the HTTP3 layer. pub(crate) fn handle_state_change( &mut self, conn: &mut Connection, state: &State, ) -> Res { qdebug!("[{self}] Handle state change {state:?}"); match state { State::Handshaking => { if self.role == Role::Server && conn.zero_rtt_state() == ZeroRttState::AcceptedServer { self.state = Http3State::ZeroRtt; self.initialize_http3_connection(conn)?; Ok(true) } else { Ok(false) } } State::Connected => { debug_assert!(matches!( self.state, Http3State::Initializing | Http3State::ZeroRtt )); if self.state == Http3State::Initializing { self.initialize_http3_connection(conn)?; } self.state = Http3State::Connected; Ok(true) } State::Closing { error, .. } | State::Draining { error, .. } => { if matches!(self.state, Http3State::Closing(_) | Http3State::Closed(_)) { Ok(false) } else { self.state = Http3State::Closing(error.clone()); Ok(true) } } State::Closed(error) => { if matches!(self.state, Http3State::Closed(_)) { Ok(false) } else { self.state = Http3State::Closed(error.clone()); Ok(true) } } _ => Ok(false), } } /// This is called when 0RTT has been reset to clear `send_streams`, `recv_streams` and /// settings. pub(crate) fn handle_zero_rtt_rejected(&mut self) -> Res<()> { if self.state == Http3State::ZeroRtt { self.state = Http3State::Initializing; self.control_stream_local = ControlStreamLocal::default(); self.qpack_encoder = Rc::new(RefCell::new(qpack::Encoder::new( self.local_params.get_qpack_settings(), true, ))); self.qpack_decoder = Rc::new(RefCell::new(qpack::Decoder::new( self.local_params.get_qpack_settings(), ))); self.settings_state = Http3RemoteSettingsState::NotReceived; self.streams_with_pending_data.clear(); // TODO: investigate whether this code can automatically retry failed transactions. self.send_streams.clear(); self.recv_streams.clear(); Ok(()) } else { debug_assert!(false, "Zero rtt rejected in the wrong state"); Err(Error::HttpInternal(3)) } } pub(crate) fn handle_datagram(&mut self, datagram: Vec) { let mut decoder = Decoder::new(&datagram); let Some(id) = decoder.decode_varint() else { qdebug!("[{self}] handle_datagram: failed to decode session ID"); return; }; let varint_len = decoder.offset(); let Some(stream) = self .recv_streams .get_mut(&StreamId::from(id * 4)) .and_then(|s| s.extended_connect_session()) else { qdebug!("[{self}] handle_datagram for unknown extended connect session"); return; }; stream .borrow_mut() .datagram(Bytes::new(datagram, varint_len)); } fn check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()> { if self .recv_streams .values() .any(|c| c.stream_type() == stream_type) { Err(Error::HttpStreamCreation) } else { Ok(()) } } /// If the new stream is a control or QPACK stream, this function creates a proper handler /// and perform a read. /// if the new stream is a `Push(_)`, `Http` or `WebTransportStream(_)` stream, the function /// returns `ReceiveOutput::NewStream(_)` and the caller will handle it. /// If the stream is of a unknown type the stream will be closed. fn handle_new_stream( &mut self, conn: &mut Connection, stream_type: NewStreamType, stream_id: StreamId, now: Instant, ) -> Res { match stream_type { NewStreamType::Control => { self.check_stream_exists(Http3StreamType::Control)?; self.recv_streams .insert(stream_id, Box::new(ControlStreamRemote::new(stream_id))); } NewStreamType::Push(push_id) => { qinfo!("[{self}] A new push stream {stream_id} push_id:{push_id}"); } NewStreamType::Decoder => { qdebug!("[{self}] A new remote qpack encoder stream {stream_id}"); self.check_stream_exists(Http3StreamType::Decoder)?; self.recv_streams.insert( stream_id, Box::new(DecoderRecvStream::new( stream_id, Rc::clone(&self.qpack_decoder), )), ); } NewStreamType::Encoder => { qdebug!("[{self}] A new remote qpack decoder stream {stream_id}"); self.check_stream_exists(Http3StreamType::Encoder)?; self.recv_streams.insert( stream_id, Box::new(EncoderRecvStream::new( stream_id, Rc::clone(&self.qpack_encoder), )), ); } NewStreamType::Http(_) => { qinfo!("[{self}] A new http stream {stream_id}"); } NewStreamType::WebTransportStream(session_id) => { let session_exists = self .send_streams .get(&StreamId::from(session_id)) .is_some_and(|s| s.stream_type() == Http3StreamType::ExtendedConnect); if !session_exists { conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?; return Ok(ReceiveOutput::NoOutput); } // Set incoming WebTransport streams to be fair (share bandwidth). // We may call this with an invalid stream ID, so ignore that error. match conn.stream_fairness(stream_id, true) { Ok(()) | Err(neqo_transport::Error::InvalidStreamId) => (), Err(e) => return Err(Error::from(e)), } qinfo!("[{self}] A new WebTransport stream {stream_id} for session {session_id}"); } NewStreamType::Unknown => { conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?; } } match stream_type { NewStreamType::Control | NewStreamType::Decoder | NewStreamType::Encoder => { self.stream_receive(conn, stream_id, now) } NewStreamType::Push(_) | NewStreamType::Http(_) | NewStreamType::WebTransportStream(_) => Ok(ReceiveOutput::NewStream(stream_type)), NewStreamType::Unknown => Ok(ReceiveOutput::NoOutput), } } /// This is called when an application closes the connection. pub fn close(&mut self, error: AppError) { qdebug!("[{self}] Close connection error {error:?}"); self.state = Http3State::Closing(CloseReason::Application(error)); if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) { qwarn!("close(0) called when streams still active"); } self.send_streams.clear(); self.recv_streams.clear(); } /// This function will not handle the output of the function completely, but only /// handle the indication that a stream is closed. There are 2 cases: /// - an error occurred or /// - the stream is done, i.e. the second value in `output` tuple is true if the stream is done /// and can be removed from the `recv_streams` /// /// How it is handling `output`: /// - if the stream is done, it removes the stream from `recv_streams` /// - if the stream is not done and there is no error, return `output` and the caller will /// handle it. /// - in case of an error: /// - if it is only a stream error and the stream is not critical, send `STOP_SENDING` frame, /// remove the stream from `recv_streams` and inform the listener that the stream has been /// reset. /// - otherwise this is a connection error. In this case, propagate the error to the caller /// that will handle it properly. fn handle_stream_manipulation_output( &mut self, output: Res<(U, bool)>, stream_id: StreamId, conn: &mut Connection, ) -> Res<(U, bool)> where U: Default, { match &output { Ok((_, true)) => { self.remove_recv_stream(stream_id, conn); } Ok((_, false)) => {} Err(e) => { if e.stream_reset_error() && !self.recv_stream_is_critical(stream_id) { drop(conn.stream_stop_sending(stream_id, e.code())); self.close_recv(stream_id, CloseType::LocalError(e.code()), conn)?; return Ok((U::default(), false)); } } } output } fn create_request_headers(request: &RequestDescription) -> Res> where T: RequestTarget, { match request.connect_type { Some(_) if request.method != "CONNECT" => { qwarn!("Method CONNECT without CONNECT type"); return Err(Error::InvalidInput); } None if request.method == "CONNECT" => { qwarn!( "Method {} with CONNECT type {:?}", request.method, request.connect_type ); return Err(Error::InvalidInput); } _ => {} } let mut headers = match request.connect_type { None => { vec![ Header::new(":method", request.method), Header::new(":scheme", request.target.scheme()), Header::new(":authority", request.target.authority()), Header::new(":path", request.target.path()), ] } Some(ConnectType::Classic) => { // > The :scheme and :path pseudo-header fields are omitted // // vec![ Header::new(":method", request.method), Header::new(":authority", request.target.authority()), ] } Some(ConnectType::Extended(protocol)) => { let mut h = vec![ Header::new(":method", request.method), Header::new(":scheme", request.target.scheme()), Header::new(":authority", request.target.authority()), Header::new(":path", request.target.path()), Header::new(":protocol", protocol.to_string()), ]; if protocol == ExtendedConnectType::ConnectUdp { h.push(Header::new("capsule-protocol", "?1")); } h } }; headers.extend_from_slice(request.headers); Ok(headers) } pub fn request( &mut self, conn: &mut Connection, send_events: Box, recv_events: Box, push_handler: Option>>, request: &RequestDescription, now: Instant, ) -> Res where T: RequestTarget, { qinfo!( "[{self}] Request method={} target: {:?}", request.method, request.target, ); let id = self.create_bidi_transport_stream(conn)?; self.request_with_stream( id, conn, send_events, recv_events, push_handler, request, now, )?; Ok(id) } fn create_bidi_transport_stream(&self, conn: &mut Connection) -> Res { // Requests cannot be created when a connection is in states: Initializing, GoingAway, // Closing and Closed. match self.state() { Http3State::GoingAway(..) | Http3State::Closing(..) | Http3State::Closed(..) => { return Err(Error::AlreadyClosed); } Http3State::Initializing => return Err(Error::Unavailable), _ => {} } let id = conn .stream_create(StreamType::BiDi) .map_err(|e| Error::map_stream_create_errors(&e))?; conn.stream_keep_alive(id, true)?; Ok(id) } #[expect(clippy::too_many_arguments, reason = "Yes, but they are needed.")] fn request_with_stream( &mut self, stream_id: StreamId, conn: &mut Connection, send_events: Box, recv_events: Box, push_handler: Option>>, request: &RequestDescription, now: Instant, ) -> Res<()> where T: RequestTarget, { let final_headers = Self::create_request_headers(request)?; let stream_type = if request.connect_type.is_some() { Http3StreamType::ExtendedConnect } else { Http3StreamType::Http }; let mut send_message = SendMessage::new( MessageType::Request, stream_type, stream_id, Rc::clone(&self.qpack_encoder), send_events, ); send_message .http_stream() .ok_or(Error::Internal)? .send_headers(&final_headers, conn)?; self.add_streams( stream_id, Box::new(send_message), Box::new(RecvMessage::new( &RecvMessageInfo { message_type: MessageType::Response, stream_type, stream_id, first_frame_type: None, }, Rc::clone(&self.qpack_decoder), recv_events, push_handler, PriorityHandler::new(false, request.priority), )), ); // Call immediately send so that at least headers get sent. This will make Firefox faster, // since it can send request body immediately in most cases and does not need to do // a complete process loop. self.send_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? .send(conn, now)?; Ok(()) } /// Stream data are read directly into a buffer supplied as a parameter of this function to /// avoid copying data. /// /// # Errors /// /// It returns an error if a stream does not exist or an error happens while reading a stream, /// e.g. early close, protocol error, etc. pub fn read_data( &mut self, conn: &mut Connection, stream_id: StreamId, buf: &mut [u8], now: Instant, ) -> Res<(usize, bool)> { qdebug!("[{self}] read_data from stream {stream_id}"); let res = self .recv_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? .read_data(conn, buf, now); self.handle_stream_manipulation_output(res, stream_id, conn) } /// This is called when an application resets a stream. /// The application reset will close both sides. pub fn stream_reset_send( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()> { qinfo!("[{self}] Reset sending side of stream {stream_id} error={error}"); if self.send_stream_is_critical(stream_id) { return Err(Error::InvalidStreamId); } self.close_send(stream_id, CloseType::ResetApp(error), conn); conn.stream_reset_send(stream_id, error)?; Ok(()) } pub fn stream_stop_sending( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()> { qinfo!("[{self}] Send stop sending for stream {stream_id} error={error}"); if self.recv_stream_is_critical(stream_id) { return Err(Error::InvalidStreamId); } self.close_recv(stream_id, CloseType::ResetApp(error), conn)?; // Stream may be already be closed and we may get an error here, but we do not care. conn.stream_stop_sending(stream_id, error)?; Ok(()) } /// Set the stream `SendOrder`. /// /// # Errors /// /// Returns `InvalidStreamId` if the stream id doesn't exist pub fn stream_set_sendorder( conn: &mut Connection, stream_id: StreamId, sendorder: Option, ) -> Res<()> { conn.stream_sendorder(stream_id, sendorder) .map_err(|_| Error::InvalidStreamId) } /// Set the stream Fairness. Fair streams will share bandwidth with other /// streams of the same sendOrder group (or the unordered group). Unfair streams /// will give bandwidth preferentially to the lowest streamId with data to send. /// /// # Errors /// /// Returns `InvalidStreamId` if the stream id doesn't exist pub fn stream_set_fairness( conn: &mut Connection, stream_id: StreamId, fairness: bool, ) -> Res<()> { conn.stream_fairness(stream_id, fairness) .map_err(|_| Error::InvalidStreamId) } pub fn cancel_fetch( &mut self, stream_id: StreamId, error: AppError, conn: &mut Connection, ) -> Res<()> { qinfo!("[{self}] cancel_fetch {stream_id} error={error}"); let send_stream = self.send_streams.get(&stream_id); let recv_stream = self.recv_streams.get(&stream_id); match (send_stream, recv_stream) { (None, None) => return Err(Error::InvalidStreamId), (Some(s), None) => { if !matches!( s.stream_type(), Http3StreamType::Http | Http3StreamType::ExtendedConnect ) { return Err(Error::InvalidStreamId); } // Stream may be already be closed and we may get an error here, but we do not care. drop(self.stream_reset_send(conn, stream_id, error)); } (None, Some(s)) => { if !matches!( s.stream_type(), Http3StreamType::Http | Http3StreamType::Push | Http3StreamType::ExtendedConnect ) { return Err(Error::InvalidStreamId); } // Stream may be already be closed and we may get an error here, but we do not care. drop(self.stream_stop_sending(conn, stream_id, error)); } (Some(s), Some(r)) => { debug_assert_eq!(s.stream_type(), r.stream_type()); if !matches!( s.stream_type(), Http3StreamType::Http | Http3StreamType::ExtendedConnect ) { return Err(Error::InvalidStreamId); } // Stream may be already be closed and we may get an error here, but we do not care. drop(self.stream_reset_send(conn, stream_id, error)); // Stream may be already be closed and we may get an error here, but we do not care. drop(self.stream_stop_sending(conn, stream_id, error)); } } Ok(()) } /// This is called when an application wants to close the sending side of a stream. pub fn stream_close_send( &mut self, conn: &mut Connection, stream_id: StreamId, now: Instant, ) -> Res<()> { qdebug!("[{self}] Close the sending side for stream {stream_id}"); debug_assert!(self.state.active()); let send_stream = self .send_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)?; // The following function may return InvalidStreamId from the transport layer if the stream // has been closed already. It is ok to ignore it here. drop(send_stream.close(conn, now)); if send_stream.done() { self.remove_send_stream(stream_id, conn); } else if send_stream.has_data_to_send() { self.streams_with_pending_data.insert(stream_id); } Ok(()) } pub fn webtransport_create_session( &mut self, conn: &mut Connection, events: Box, target: T, headers: &[Header], ) -> Res where T: RequestTarget, { qinfo!("[{self}] Create WebTransport"); if !self.webtransport_enabled() { return Err(Error::Unavailable); } self.extended_connect_create_session( conn, events, target, headers, ExtendedConnectType::WebTransport, ) } pub fn connect_udp_create_session( &mut self, conn: &mut Connection, events: Box, target: T, headers: &[Header], ) -> Res where T: RequestTarget, { qinfo!("[{self}] Create ConnectUdp"); if !self.connect_udp_enabled() { return Err(Error::Unavailable); } self.extended_connect_create_session( conn, events, target, headers, ExtendedConnectType::ConnectUdp, ) } pub fn extended_connect_create_session( &mut self, conn: &mut Connection, events: Box, target: T, headers: &[Header], connect_type: ExtendedConnectType, ) -> Res where T: RequestTarget, { let id = self.create_bidi_transport_stream(conn)?; let extended_conn = Rc::new(RefCell::new(extended_connect::session::Session::new( id, events, self.role, Rc::clone(&self.qpack_encoder), Rc::clone(&self.qpack_decoder), connect_type, ))); self.add_streams( id, Box::new(Rc::clone(&extended_conn)), Box::new(Rc::clone(&extended_conn)), ); let final_headers = Self::create_request_headers(&RequestDescription { method: "CONNECT", target, headers, connect_type: Some(ConnectType::Extended(connect_type)), priority: Priority::default(), })?; extended_conn .borrow_mut() .send_request(&final_headers, conn)?; self.streams_with_pending_data.insert(id); Ok(id) } pub(crate) fn webtransport_session_accept( &mut self, conn: &mut Connection, stream_id: StreamId, events: Box, accept_res: &SessionAcceptAction, now: Instant, ) -> Res<()> { qtrace!("Respond to WebTransport session with accept={accept_res}"); if !self.webtransport_enabled() { return Err(Error::Unavailable); } self.extended_connect_session_accept( conn, stream_id, events, accept_res, ExtendedConnectType::WebTransport, now, ) } pub(crate) fn connect_udp_session_accept( &mut self, conn: &mut Connection, stream_id: StreamId, events: Box, accept_res: &SessionAcceptAction, now: Instant, ) -> Res<()> { qtrace!("Respond to ConnectUdp session with accept={accept_res}"); if !self.connect_udp_enabled() { return Err(Error::Unavailable); } self.extended_connect_session_accept( conn, stream_id, events, accept_res, ExtendedConnectType::ConnectUdp, now, ) } fn extended_connect_session_accept( &mut self, conn: &mut Connection, stream_id: StreamId, events: Box, accept_res: &SessionAcceptAction, connect_type: ExtendedConnectType, now: Instant, ) -> Res<()> { let mut recv_stream = self.recv_streams.get_mut(&stream_id); if let Some(r) = &mut recv_stream && !r .http_stream() .ok_or(Error::InvalidStreamId)? .extended_connect_wait_for_response() { return Err(Error::InvalidStreamId); } let send_stream = self.send_streams.get_mut(&stream_id); conn.stream_keep_alive(stream_id, true)?; match (send_stream, recv_stream, accept_res) { (None, None, _) => Err(Error::InvalidStreamId), (None, Some(_), _) | (Some(_), None, _) => { // Stream is in an inconsistent state (one direction exists, the other doesn't). self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; Err(Error::InvalidState) } (Some(s), Some(_r), SessionAcceptAction::Reject(headers)) => { if s.http_stream() .ok_or(Error::InvalidStreamId)? .send_headers(headers, conn) .is_ok() { drop(self.stream_close_send(conn, stream_id, now)); // TODO issue 1294: add a timer to clean up the recv_stream if the peer does not // do that in a short time. self.streams_with_pending_data.insert(stream_id); } else { self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; } Ok(()) } (Some(s), Some(_r), SessionAcceptAction::Accept) => { let mut response_headers = vec![Header::new(":status", "200")]; if connect_type == ExtendedConnectType::ConnectUdp { response_headers.push(Header::new("capsule-protocol", "?1")); } if s.http_stream() .ok_or(Error::InvalidStreamId)? .send_headers(&response_headers, conn) .is_ok() { let extended_conn = Rc::new(RefCell::new( extended_connect::session::Session::new_with_http_streams( stream_id, events, self.role, self.recv_streams .remove(&stream_id) .ok_or(Error::Internal)?, self.send_streams .remove(&stream_id) .ok_or(Error::Internal)?, connect_type, )?, )); self.add_streams( stream_id, Box::new(Rc::clone(&extended_conn)), Box::new(extended_conn), ); self.streams_with_pending_data.insert(stream_id); } else { self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; return Err(Error::InvalidStreamId); } Ok(()) } } } pub(crate) fn webtransport_close_session( &mut self, conn: &mut Connection, session_id: StreamId, error: u32, message: &str, now: Instant, ) -> Res<()> { qtrace!("Close WebTransport session {session_id:?}"); self.extended_connect_close_session(conn, session_id, error, message, now) } pub(crate) fn connect_udp_close_session( &mut self, conn: &mut Connection, session_id: StreamId, error: u32, message: &str, now: Instant, ) -> Res<()> { qtrace!("Close ConnectUdp session {session_id:?}"); self.extended_connect_close_session(conn, session_id, error, message, now) } fn extended_connect_close_session( &mut self, conn: &mut Connection, session_id: StreamId, error: u32, message: &str, now: Instant, ) -> Res<()> { let send_stream = self .send_streams .get_mut(&session_id) .filter(|s| s.stream_type() == Http3StreamType::ExtendedConnect) .ok_or(Error::InvalidStreamId)?; send_stream.close_with_message(conn, error, message, now)?; if send_stream.done() { self.remove_send_stream(session_id, conn); } else if send_stream.has_data_to_send() { self.streams_with_pending_data.insert(session_id); } Ok(()) } pub(crate) fn webtransport_create_stream_local( &mut self, conn: &mut Connection, session_id: StreamId, stream_type: StreamType, send_events: Box, recv_events: Box, ) -> Res { qtrace!("Create new WebTransport stream session={session_id} type={stream_type:?}"); let wt = self .recv_streams .get(&session_id) .ok_or(Error::InvalidStreamId)? .extended_connect_session() .ok_or(Error::InvalidStreamId)?; if !wt.borrow().is_active() { return Err(Error::InvalidStreamId); } let stream_id = conn .stream_create(stream_type) .map_err(|e| Error::map_stream_create_errors(&e))?; // Set outgoing WebTransport streams to be fair (share bandwidth) conn.stream_fairness(stream_id, true)?; self.webtransport_create_stream_internal( wt, stream_id, session_id, send_events, recv_events, true, )?; Ok(stream_id) } pub(crate) fn webtransport_create_stream_remote( &mut self, session_id: StreamId, stream_id: StreamId, send_events: Box, recv_events: Box, ) -> Res<()> { qtrace!("Create new WebTransport stream session={session_id} stream_id={stream_id}"); let wt = self .recv_streams .get(&session_id) .ok_or(Error::InvalidStreamId)? .extended_connect_session() .ok_or(Error::InvalidStreamId)?; self.webtransport_create_stream_internal( wt, stream_id, session_id, send_events, recv_events, false, )?; Ok(()) } fn webtransport_create_stream_internal( &mut self, webtransport_session: Rc>, stream_id: StreamId, session_id: StreamId, send_events: Box, recv_events: Box, local: bool, ) -> Res<()> { webtransport_session.borrow_mut().add_stream(stream_id)?; if stream_id.stream_type() == StreamType::UniDi { if local { self.send_streams.insert( stream_id, Box::new(WebTransportSendStream::new( stream_id, session_id, send_events, webtransport_session, true, )), ); } else { self.recv_streams.insert( stream_id, Box::new(WebTransportRecvStream::new( stream_id, session_id, recv_events, webtransport_session, )), ); } } else { self.add_streams( stream_id, Box::new(WebTransportSendStream::new( stream_id, session_id, send_events, Rc::clone(&webtransport_session), local, )), Box::new(WebTransportRecvStream::new( stream_id, session_id, recv_events, webtransport_session, )), ); } Ok(()) } pub fn webtransport_send_datagram>( &mut self, session_id: StreamId, conn: &mut Connection, buf: &[u8], id: I, now: Instant, ) -> Res<()> { self.extended_connect_send_datagram(session_id, conn, buf, id, now) } pub fn connect_udp_send_datagram>( &mut self, session_id: StreamId, conn: &mut Connection, buf: &[u8], id: I, now: Instant, ) -> Res<()> { self.extended_connect_send_datagram(session_id, conn, buf, id, now) } fn extended_connect_send_datagram>( &mut self, session_id: StreamId, conn: &mut Connection, buf: &[u8], id: I, now: Instant, ) -> Res<()> { self.recv_streams .get_mut(&session_id) .ok_or(Error::InvalidStreamId)? .extended_connect_session() .ok_or(Error::InvalidStreamId)? .borrow_mut() .send_datagram(conn, buf, id, now) } /// If the control stream has received frames `MaxPushId`, `Goaway`, `PriorityUpdateRequest` or /// `PriorityUpdateRequestPush` which handling is specific to the client and server, we must /// give them to the specific client/server handler. fn handle_control_frame(&mut self, f: HFrame) -> Res> { qdebug!("[{self}] Handle a control frame {f:?}"); if !matches!(f, HFrame::Settings { .. }) && !matches!( self.settings_state, Http3RemoteSettingsState::Received { .. } ) { return Err(Error::HttpMissingSettings); } match f { HFrame::Settings { settings } => { self.handle_settings(settings)?; Ok(None) } HFrame::Goaway { .. } | HFrame::MaxPushId { .. } | HFrame::CancelPush { .. } | HFrame::PriorityUpdateRequest { .. } | HFrame::PriorityUpdatePush { .. } => Ok(Some(f)), _ => Err(Error::HttpFrameUnexpected), } } fn set_qpack_settings(&self, settings: &HSettings) -> Res<()> { let mut qpe = self.qpack_encoder.borrow_mut(); qpe.set_max_capacity(settings.get(HSettingType::MaxTableCapacity))?; qpe.set_max_blocked_streams(settings.get(HSettingType::BlockedStreams))?; Ok(()) } fn handle_settings(&mut self, new_settings: HSettings) -> Res<()> { qdebug!("[{self}] Handle SETTINGS frame"); match &self.settings_state { Http3RemoteSettingsState::NotReceived => { self.set_qpack_settings(&new_settings)?; self.webtransport.handle_settings(&new_settings); self.connect_udp.handle_settings(&new_settings); self.settings_state = Http3RemoteSettingsState::Received(new_settings); Ok(()) } Http3RemoteSettingsState::ZeroRtt(settings) => { self.webtransport.handle_settings(&new_settings); self.connect_udp.handle_settings(&new_settings); let mut qpack_changed = false; for st in &[ HSettingType::MaxHeaderListSize, HSettingType::MaxTableCapacity, HSettingType::BlockedStreams, ] { let zero_rtt_value = settings.get(*st); let new_value = new_settings.get(*st); if zero_rtt_value == new_value { continue; } if zero_rtt_value > new_value { qerror!( "[{self}] The new({new_value}) and the old value({zero_rtt_value}) of setting {st:?} do not match" ); return Err(Error::HttpSettings); } match st { HSettingType::MaxTableCapacity => { if zero_rtt_value != 0 { return Err(Error::Qpack(neqo_qpack::Error::DecoderStream)); } qpack_changed = true; } HSettingType::BlockedStreams => qpack_changed = true, HSettingType::MaxHeaderListSize | HSettingType::EnableWebTransport | HSettingType::EnableH3Datagram | HSettingType::EnableConnect => (), } } if qpack_changed { qdebug!("[{self}] Settings after zero rtt differ"); self.set_qpack_settings(&(new_settings))?; } self.settings_state = Http3RemoteSettingsState::Received(new_settings); Ok(()) } Http3RemoteSettingsState::Received { .. } => Err(Error::HttpFrameUnexpected), } } /// Adds a new send and receive stream. pub(crate) fn add_streams( &mut self, stream_id: StreamId, send_stream: Box, recv_stream: Box, ) { if send_stream.has_data_to_send() { self.streams_with_pending_data.insert(stream_id); } self.send_streams.insert(stream_id, send_stream); self.recv_streams.insert(stream_id, recv_stream); } /// Add a new recv stream. This is used for push streams. pub(crate) fn add_recv_stream( &mut self, stream_id: StreamId, recv_stream: Box, ) { self.recv_streams.insert(stream_id, recv_stream); } pub(crate) fn queue_control_frame(&mut self, frame: &HFrame) { self.control_stream_local.queue_frame(frame); } pub(crate) fn queue_update_priority( &mut self, stream_id: StreamId, priority: Priority, ) -> Res { let stream = self .recv_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? .http_stream() .ok_or(Error::InvalidStreamId)?; if stream.maybe_update_priority(priority)? { self.control_stream_local.queue_update_priority(stream_id); Ok(true) } else { Ok(false) } } fn recv_stream_is_critical(&self, stream_id: StreamId) -> bool { self.recv_streams.get(&stream_id).is_some_and(|r| { matches!( r.stream_type(), Http3StreamType::Control | Http3StreamType::Encoder | Http3StreamType::Decoder ) }) } fn send_stream_is_critical(&self, stream_id: StreamId) -> bool { self.qpack_encoder .borrow() .local_stream_id() .iter() .chain(self.qpack_decoder.borrow().local_stream_id().iter()) .chain(self.control_stream_local.stream_id().iter()) .any(|id| stream_id == *id) } fn close_send(&mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection) { if let Some(mut s) = self.remove_send_stream(stream_id, conn) { s.handle_stop_sending(close_type); } } fn close_recv( &mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection, ) -> Res<()> { if let Some(mut s) = self.remove_recv_stream(stream_id, conn) { s.reset(close_type)?; } Ok(()) } fn remove_extended_connect( &mut self, wt: &Rc>, conn: &mut Connection, ) { let (recv, send) = wt.borrow_mut().take_sub_streams(); #[expect( clippy::iter_over_hash_type, reason = "OK to loop over active streams in an undefined order." )] for id in recv { qtrace!("Remove the extended connect sub receiver stream {id}"); // Use CloseType::ResetRemote so that an event will be sent. CloseType::LocalError would // have the same effect. if let Some(mut s) = self.recv_streams.remove(&id) { drop(s.reset(CloseType::ResetRemote(Error::HttpRequestCancelled.code()))); } drop(conn.stream_stop_sending(id, Error::HttpRequestCancelled.code())); } #[expect( clippy::iter_over_hash_type, reason = "OK to loop over active streams in an undefined order." )] for id in send { qtrace!("Remove the extended connect sub send stream {id}"); if let Some(mut s) = self.send_streams.remove(&id) { s.handle_stop_sending(CloseType::ResetRemote(Error::HttpRequestCancelled.code())); } drop(conn.stream_reset_send(id, Error::HttpRequestCancelled.code())); } } fn remove_recv_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option> { let stream = self.recv_streams.remove(&stream_id); if let Some(s) = &stream && s.stream_type() == Http3StreamType::ExtendedConnect { self.send_streams.remove(&stream_id)?; if let Some(wt) = s.extended_connect_session() { self.remove_extended_connect(&wt, conn); } } stream } fn remove_send_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option> { let stream = self.send_streams.remove(&stream_id); if let Some(s) = &stream && s.stream_type() == Http3StreamType::ExtendedConnect && let Some(wt) = self .recv_streams .remove(&stream_id)? .extended_connect_session() { self.remove_extended_connect(&wt, conn); } stream } pub const fn webtransport_enabled(&self) -> bool { self.webtransport.enabled() } pub const fn connect_udp_enabled(&self) -> bool { self.connect_udp.enabled() } #[must_use] pub const fn state(&self) -> &Http3State { &self.state } pub fn set_state(&mut self, state: Http3State) { self.state = state; } #[must_use] pub const fn state_mut(&mut self) -> &mut Http3State { &mut self.state } #[must_use] pub const fn qpack_encoder(&self) -> &Rc> { &self.qpack_encoder } #[must_use] pub const fn qpack_decoder(&self) -> &Rc> { &self.qpack_decoder } #[must_use] pub fn send_streams(&self) -> &HashMap> { &self.send_streams } #[must_use] pub fn send_streams_mut(&mut self) -> &mut HashMap> { &mut self.send_streams } #[must_use] pub fn recv_streams(&self) -> &HashMap> { &self.recv_streams } #[must_use] pub fn recv_streams_mut(&mut self) -> &mut HashMap> { &mut self.recv_streams } } #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod tests { use http::Uri; use crate::{ Error, Priority, connection::{Http3Connection, RequestDescription}, features::ConnectType, }; #[test] fn create_request_headers_connect_without_connect_type() { let request = RequestDescription { method: "CONNECT", target: &Uri::from_static("https://example.com"), headers: &[], connect_type: None, priority: Priority::default(), }; assert_eq!( Http3Connection::create_request_headers(&request), Err(Error::InvalidInput) ); } #[test] fn create_request_headers_connect_type_without_connect() { let request = RequestDescription { method: "GET", target: &Uri::from_static("https://example.com"), headers: &[], connect_type: Some(ConnectType::Classic), priority: Priority::default(), }; assert_eq!( Http3Connection::create_request_headers(&request), Err(Error::InvalidInput) ); } }