// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use std::{ collections::HashSet, fmt::{self, Display, Formatter}, mem, time::Instant, }; use neqo_common::{Bytes, Encoder, Role, qtrace}; use neqo_transport::{Connection, StreamId}; use crate::{ Error, Http3StreamInfo, Http3StreamType, RecvStream, Res, SendStream, features::extended_connect::{ CloseReason, ExtendedConnectEvents, ExtendedConnectType, session::{DgramContextIdError, Protocol, State}, }, frames::{FrameReader, StreamReaderRecvStreamWrapper, WebTransportFrame}, }; #[derive(Debug)] pub struct Session { frame_reader: FrameReader, id: StreamId, send_streams: HashSet, recv_streams: HashSet, role: Role, /// Remote initiated streams received before session confirmation. /// /// [`HashSet`] size limited by QUIC connection stream limit. pending_streams: HashSet, } impl Display for Session { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "WebTransportSession") } } impl Session { #[must_use] pub(crate) fn new(session_id: StreamId, role: Role) -> Self { Self { id: session_id, frame_reader: FrameReader::new(), send_streams: HashSet::default(), recv_streams: HashSet::default(), role, pending_streams: HashSet::default(), } } } impl Protocol for Session { fn connect_type(&self) -> ExtendedConnectType { ExtendedConnectType::WebTransport } fn session_start(&mut self, events: &mut Box) -> Res<()> { // > WebTransport endpoints SHOULD buffer streams and // > datagrams until they can be associated with an // > established session. // // #[expect(clippy::iter_over_hash_type, reason = "no defined order necessary")] for stream_id in self.pending_streams.drain() { events.extended_connect_new_stream( Http3StreamInfo::new(stream_id, Http3StreamType::WebTransport(self.id)), // Explicitly emit a stream readable event. Such // event was previously suppressed as the // session was still negotiating. true, )?; } Ok(()) } fn close_frame(&self, error: u32, message: &str) -> Option> { let close_frame = WebTransportFrame::CloseSession { error, message: message.to_string(), }; let mut encoder = Encoder::default(); close_frame.encode(&mut encoder); Some(encoder.into()) } fn read_control_stream( &mut self, conn: &mut Connection, events: &mut Box, control_stream_recv: &mut Box, now: Instant, ) -> Res> { let (f, fin) = self .frame_reader .receive::( &mut StreamReaderRecvStreamWrapper::new(conn, control_stream_recv), now, ) .map_err(|_| Error::HttpGeneralProtocolStream)?; qtrace!("[{self}] Received frame: {f:?} fin={fin}"); if let Some(WebTransportFrame::CloseSession { error, message }) = f { events.session_end( ExtendedConnectType::WebTransport, self.id, CloseReason::Clean { error, message }, None, ); if fin { Ok(Some(State::Done)) } else { Ok(Some(State::FinPending)) } } else if fin { events.session_end( ExtendedConnectType::WebTransport, self.id, CloseReason::Clean { error: 0, message: String::new(), }, None, ); Ok(Some(State::Done)) } else { Ok(None) } } fn add_stream( &mut self, stream_id: StreamId, events: &mut Box, state: State, ) -> Res<()> { match state { State::Negotiating | State::Active => {} State::FinPending | State::Done => return Ok(()), } if stream_id.is_bidi() { self.send_streams.insert(stream_id); self.recv_streams.insert(stream_id); } else if stream_id.is_self_initiated(self.role) { self.send_streams.insert(stream_id); } else { self.recv_streams.insert(stream_id); } match state { State::FinPending | State::Done => { unreachable!("see match above"); } State::Negotiating => { // > a client may receive a server-initiated stream or a datagram // > before receiving the CONNECT response headers from the // > server. // > // > To handle this case, WebTransport endpoints SHOULD buffer // > streams and datagrams until they can be associated with an // > established session. // // self.pending_streams.insert(stream_id); } State::Active => { if !stream_id.is_self_initiated(self.role) { events.extended_connect_new_stream( Http3StreamInfo::new(stream_id, Http3StreamType::WebTransport(self.id)), // Don't emit an additional stream readable event. Given // that the session is already active, this event will // be emitted through the WebTransport stream itself. false, )?; } } } Ok(()) } fn remove_recv_stream(&mut self, stream_id: StreamId) { self.recv_streams.remove(&stream_id); } fn remove_send_stream(&mut self, stream_id: StreamId) { self.send_streams.remove(&stream_id); } fn take_sub_streams(&mut self) -> (HashSet, HashSet) { ( mem::take(&mut self.recv_streams), mem::take(&mut self.send_streams), ) } fn write_datagram_prefix(&self, _encoder: &mut Encoder) { // WebTransport does not add prefix (i.e. context ID). } fn dgram_context_id(&self, datagram: Bytes) -> Result { // WebTransport does not use a prefix (i.e. context ID). Ok(datagram) } fn datagram_capsule_support(&self) -> bool { // HTTP/3 WebTransport requires QUIC datagram support. In other words, // HTTP/3 WebTransport never falls back to HTTP datagram capsules. // // > WebTransport over HTTP/3 also requires support for QUIC datagrams. // > To indicate support, both the client and the server send a // > max_datagram_frame_size transport parameter with a value greater than // > 0 (see Section 3 of [QUIC-DATAGRAM]). // // false } fn write_datagram_capsule( &self, _control_stream_send: &mut Box, _conn: &mut Connection, _buf: &[u8], _now: Instant, ) -> Res<()> { debug_assert!( false, "[{self}] WebTransport does not support datagram capsules." ); Ok(()) } }