// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use std::{ cell::RefCell, fmt::{self, Display}, num::NonZeroUsize, rc::Rc, slice, time::Instant, }; use neqo_common::{Datagram, Header, header::HeadersExt as _, hex, qdebug, qerror, qinfo}; use neqo_crypto::{AntiReplay, generate_ech_keys, random}; use neqo_http3::{ Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, }; use neqo_transport::{ConnectionIdGenerator, OutputBatch, server::ValidateAddress}; use rustc_hash::FxHashMap as HashMap; use super::{Args, qns_read_response}; use crate::send_data::{SendData, SendResult}; pub struct HttpServer { server: Http3Server, /// Progress writing to each stream. remaining_data: HashMap, /// Tracks POST requests: (bytes received, optional response size from path) posts: HashMap)>, is_qns_test: bool, } impl HttpServer { /// Send a response on the given stream. If the stream was closed (e.g., by /// `STOP_SENDING`), this logs the error and returns gracefully per RFC 9000. fn send_response( &mut self, stream: &Http3OrWebTransportStream, mut response: SendData, now: Instant, ) { if stream .send_headers(&[ Header::new(":status", "200"), Header::new("content-length", response.len().to_string()), ]) .is_err() { qerror!("Stream {stream} closed by peer, not sending response"); _ = stream.stream_reset_send(neqo_http3::Error::HttpNone.code()); return; } if let Some(remaining) = Self::send_response_body(stream, &mut response, now) { self.remaining_data.insert(stream.stream_id(), remaining); } } /// Send response body data. Returns `None` if done or stream closed, /// `Some(data)` if more data remains to send. fn send_response_body( stream: &Http3OrWebTransportStream, response: &mut SendData, now: Instant, ) -> Option { match response.send(|chunk| stream.send_data(chunk, now)) { SendResult::StreamClosed => { qerror!("Stream {stream} closed"); _ = stream.stream_reset_send(neqo_http3::Error::HttpNone.code()); None } SendResult::Done => { _ = stream.stream_close_send(now); // Stream may be closed; ignore errors. None } SendResult::MoreData => Some(std::mem::take(response)), } } pub fn new( args: &Args, anti_replay: AntiReplay, cid_mgr: Rc>, ) -> Self { let mut server = Http3Server::new( args.now(), slice::from_ref(&args.key), slice::from_ref(&args.shared.alpn), anti_replay, cid_mgr, Http3Parameters::default() .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) .max_table_size_encoder(args.shared.max_table_size_encoder) .max_table_size_decoder(args.shared.max_table_size_decoder) .max_blocked_streams(args.shared.max_blocked_streams), None, ) .expect("We cannot make a server!"); server.set_ciphers(args.get_ciphers()); server.set_qlog_dir(args.shared.qlog_dir.clone()); if args.retry { server.set_validation(ValidateAddress::Always); } if args.ech { let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); server .enable_ech(random::<1>()[0], "public.example", &sk, &pk) .unwrap(); qinfo!("ECHConfigList: {}", hex(server.ech_config())); } Self { server, remaining_data: HashMap::default(), posts: HashMap::default(), is_qns_test: args.shared.qns_test.is_some(), } } } impl Display for HttpServer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.server.fmt(f) } } impl super::HttpServer for HttpServer { fn process_multiple<'a, D: IntoIterator>>( &mut self, dgrams: D, now: Instant, max_datagrams: NonZeroUsize, ) -> OutputBatch { self.server.process_multiple(dgrams, now, max_datagrams) } fn process_events(&mut self, _now: Instant) { let now = Instant::now(); while let Some(event) = self.server.next_event() { match event { Http3ServerEvent::Headers { stream, headers, fin, } => { qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); if headers.contains_header(":method", b"POST") { let response_size = headers.find_header(":path").and_then(|path| { path.value_utf8() .ok()? .trim_matches('/') .parse::() .ok() }); self.posts.insert(stream, (0, response_size)); continue; } let Some(path) = headers.find_header(":path") else { _ = stream.cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()); // Stream may be closed; ignore errors. continue; }; let response = if self.is_qns_test { let path_str = path.value_utf8().unwrap_or("/"); match qns_read_response(path_str) { Ok(data) => SendData::from(data), Err(e) => { qerror!("Failed to read {path_str}: {e}"); // Stream may be closed; ignore errors. if stream .send_headers(&[Header::new(":status", "404")]) .is_ok() { _ = stream.stream_close_send(now); } else { _ = stream .stream_reset_send(neqo_http3::Error::HttpNone.code()); } continue; } } } else if let Ok(path_str) = path.value_utf8() { path_str .trim_matches(|p| p == '/') .parse::() .map_or_else(|_| SendData::from(path.value()), SendData::zeroes) } else { SendData::from(path.value()) }; self.send_response(&stream, response, now); } Http3ServerEvent::DataWritable { stream } => { if self.posts.get_mut(&stream).is_none() && let Some(mut remaining) = self.remaining_data.remove(&stream.stream_id()) && let Some(data) = Self::send_response_body(&stream, &mut remaining, now) { self.remaining_data.insert(stream.stream_id(), data); } } Http3ServerEvent::Data { stream, data, fin } => { if let Some((received, _)) = self.posts.get_mut(&stream) { *received += data.len(); } if fin && let Some((received, response_size)) = self.posts.remove(&stream) { let response = response_size.map_or_else( || SendData::from(received.to_string().into_bytes()), SendData::zeroes, ); self.send_response(&stream, response, now); } } _ => {} } } } fn has_events(&self) -> bool { self.server.has_events() } }