// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use std::{ borrow::Cow, cell::RefCell, fmt::{self, Display, Formatter}, num::NonZeroUsize, rc::Rc, slice, str, time::Instant, }; use neqo_common::{Datagram, event::Provider as _, hex, qdebug, qerror, qinfo, qwarn}; use neqo_crypto::{AllowZeroRtt, AntiReplay, generate_ech_keys, random}; use neqo_http3::Error; use neqo_transport::{ ConnectionEvent, ConnectionIdGenerator, OutputBatch, State, StreamId, server::{ConnectionRef, Server, ValidateAddress}, }; use rustc_hash::FxHashMap as HashMap; use super::{Args, qns_read_response}; use crate::{ STREAM_IO_BUFFER_SIZE, send_data::{SendData, SendResult}, }; #[derive(Default)] struct HttpStreamState { writable: bool, data_to_send: Option, } pub struct HttpServer { server: Server, write_state: HashMap, read_state: HashMap>, is_qns_test: bool, read_buffer: Vec, } impl HttpServer { pub fn new( args: &Args, anti_replay: AntiReplay, cid_manager: Rc>, ) -> Result { let mut server = Server::new( args.now(), slice::from_ref(&args.key), slice::from_ref(&args.shared.alpn), anti_replay, Box::new(AllowZeroRtt {}), cid_manager, args.shared.quic_parameters.get(&args.shared.alpn), )?; server.set_ciphers(args.get_ciphers()); server.set_qlog_dir(args.shared.qlog_dir.clone()); if args.retry { server.set_validation(ValidateAddress::Always); } if args.ech { let (sk, pk) = generate_ech_keys().map_err(|_| Error::Internal)?; server .enable_ech(random::<1>()[0], "public.example", &sk, &pk) .map_err(|_| Error::Internal)?; qinfo!("ECHConfigList: {}", hex(server.ech_config())); } Ok(Self { server, write_state: HashMap::default(), read_state: HashMap::default(), is_qns_test: args.shared.qns_test.is_some(), read_buffer: vec![0; STREAM_IO_BUFFER_SIZE], }) } fn save_partial(&mut self, stream_id: StreamId, partial: Vec, conn: &ConnectionRef) { if partial.len() < 4096 { qdebug!( "Saving partial URL: {}", String::from_utf8(partial.clone()) .unwrap_or_else(|_| format!("", hex(&partial))) ); self.read_state.insert(stream_id, partial); } else { qdebug!( "Giving up on partial URL {}", String::from_utf8(partial.clone()) .unwrap_or_else(|_| format!("", hex(&partial))) ); _ = conn.borrow_mut().stream_stop_sending(stream_id, 0); // Stream may be closed; ignore errors. } } fn stream_readable(&mut self, stream_id: StreamId, conn: &ConnectionRef) { if !stream_id.is_client_initiated() || !stream_id.is_bidi() { qdebug!("Stream {stream_id} not client-initiated bidi, ignoring"); return; } let (sz, fin) = conn .borrow_mut() .stream_recv(stream_id, &mut self.read_buffer) .expect("Read should succeed"); if sz == 0 { if !fin { qdebug!("size 0 but !fin"); } return; } let read_buffer = &self.read_buffer[..sz]; let buf = self.read_state.remove(&stream_id).map_or( Cow::Borrowed(read_buffer), |mut existing| { existing.extend_from_slice(read_buffer); Cow::Owned(existing) }, ); let Ok(msg) = str::from_utf8(&buf[..]) else { self.save_partial(stream_id, buf.to_vec(), conn); return; }; // Parse "GET /path\n" or "GET /path\r\n" let Some(path) = msg .strip_prefix("GET /") .and_then(|s| s.lines().next()) .filter(|p| { if self.is_qns_test { !p.chars().any(char::is_whitespace) } else { p.chars().all(|c| c.is_ascii_digit()) } }) else { self.save_partial(stream_id, buf.to_vec(), conn); return; }; let resp: SendData = { qdebug!("Path = '{path}'"); if self.is_qns_test { match qns_read_response(path) { Ok(data) => data.into(), Err(e) => { qerror!("Failed to read {path}: {e}"); b"404".to_vec().into() } } } else { let count = path.parse().unwrap(); SendData::zeroes(count) } }; if let Some(stream_state) = self.write_state.get_mut(&stream_id) { match stream_state.data_to_send { None => stream_state.data_to_send = Some(resp), Some(_) => { qdebug!("Data already set, doing nothing"); } } if stream_state.writable { self.stream_writable(stream_id, conn); } } else { self.write_state.insert( stream_id, HttpStreamState { writable: false, data_to_send: Some(resp), }, ); } } fn stream_writable(&mut self, stream_id: StreamId, conn: &ConnectionRef) { let Some(stream_state) = self.write_state.get_mut(&stream_id) else { qwarn!("Unknown stream {stream_id}, ignoring event"); return; }; stream_state.writable = true; if let Some(resp) = &mut stream_state.data_to_send { match resp.send(|chunk| conn.borrow_mut().stream_send(stream_id, chunk)) { SendResult::StreamClosed => { qwarn!("Stream {stream_id} closed by peer, stopping send"); self.write_state.remove(&stream_id); } SendResult::Done => { _ = conn.borrow_mut().stream_close_send(stream_id); // Stream may be closed; ignore errors. self.write_state.remove(&stream_id); } SendResult::MoreData => { stream_state.writable = false; } } } } } impl super::HttpServer for HttpServer { fn process_multiple<'a, D: IntoIterator>>( &mut self, dgrams: D, now: Instant, max_datagrams: NonZeroUsize, ) -> OutputBatch { self.server.process_multiple(dgrams, now, max_datagrams) } fn process_events(&mut self, now: Instant) { #[expect( clippy::mutable_key_type, reason = "ActiveConnectionRef::Hash doesn't access any of the interior mutable types" )] let active_conns = self.server.active_connections(); #[expect( clippy::iter_over_hash_type, reason = "OK to loop over active connections in an undefined order." )] for acr in active_conns { loop { let Some(event) = acr.borrow_mut().next_event() else { break; }; match event { ConnectionEvent::NewStream { stream_id } => { self.write_state .insert(stream_id, HttpStreamState::default()); } ConnectionEvent::RecvStreamReadable { stream_id } => { self.stream_readable(stream_id, &acr); } ConnectionEvent::SendStreamWritable { stream_id } => { self.stream_writable(stream_id, &acr); } ConnectionEvent::StateChange(State::Connected) => { acr.connection() .borrow_mut() .send_ticket(now, b"hi!") .unwrap(); } ConnectionEvent::StateChange(_) | ConnectionEvent::SendStreamCreatable { .. } | ConnectionEvent::SendStreamComplete { .. } => (), e => qwarn!("unhandled event {e:?}"), } } } } fn has_events(&self) -> bool { self.server.has_active_connections() } } impl Display for HttpServer { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "Http 0.9 server ") } }