// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! An HTTP 3 client implementation. use std::{ cell::RefCell, collections::VecDeque, fmt::Display, fs::File, io::{BufWriter, Write as _}, net::SocketAddr, num::NonZeroUsize, path::PathBuf, rc::Rc, time::Instant, }; use http::Uri as Url; use neqo_common::{Datagram, event::Provider, hex, qdebug, qerror, qinfo, qwarn}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ AppError, CloseReason, Connection, EmptyConnectionIdGenerator, Error as TransportError, OutputBatch, RandomConnectionIdGenerator, StreamId, }; use rustc_hash::FxHashMap as HashMap; use super::{Args, CloseState, Res, get_output_file, qlog_new}; use crate::{ STREAM_IO_BUFFER_SIZE, send_data::{SendData, SendResult}, }; pub struct Handler { #[expect(clippy::struct_field_names, reason = "This name is more descriptive.")] url_handler: UrlHandler, token: Option, output_read_data: bool, read_buffer: Vec, } impl Handler { pub(crate) fn new(url_queue: VecDeque, args: Args) -> Self { let output_read_data = args.output_read_data; let url_handler = UrlHandler { url_queue, handled_urls: Vec::new(), stream_handlers: HashMap::default(), all_paths: Vec::new(), args, }; Self { url_handler, token: None, output_read_data, read_buffer: vec![0; STREAM_IO_BUFFER_SIZE], } } } pub fn create_client( args: &Args, local_addr: SocketAddr, remote_addr: SocketAddr, hostname: &str, resumption_token: Option, ) -> Res { let cid_generator: Rc> = if args.cid_len == 0 { Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())) } else { Rc::new(RefCell::new(RandomConnectionIdGenerator::new( args.cid_len.into(), ))) }; let mut transport = Connection::new_client( hostname, &[&args.shared.alpn], cid_generator, local_addr, remote_addr, args.shared.quic_parameters.get(args.shared.alpn.as_str()), Instant::now(), )?; let ciphers = args.get_ciphers(); if !ciphers.is_empty() { transport.set_ciphers(&ciphers)?; } let mut client = Http3Client::new_with_conn( transport, Http3Parameters::default() .max_table_size_encoder(args.shared.max_table_size_encoder) .max_table_size_decoder(args.shared.max_table_size_decoder) .max_blocked_streams(args.shared.max_blocked_streams) .max_concurrent_push_streams(args.max_concurrent_push_streams), ); let qlog = qlog_new(args, hostname, client.connection_id())?; client.set_qlog(qlog); if let Some(ech) = &args.ech { client.enable_ech(ech)?; } if let Some(token) = resumption_token { client.enable_resumption(Instant::now(), token)?; } Ok(client) } impl TryFrom for CloseState { type Error = CloseReason; fn try_from(value: Http3State) -> Result { let (state, error) = match value { Http3State::Closing(error) => (Self::Closing, error), Http3State::Closed(error) => (Self::Closed, error), _ => return Ok(Self::NotClosing), }; if error.is_error() { Err(error) } else { Ok(state) } } } impl super::Client for Http3Client { fn is_closed(&self) -> Result { self.state().try_into() } fn process_multiple_output( &mut self, now: Instant, max_datagrams: NonZeroUsize, ) -> OutputBatch { self.process_multiple_output(now, max_datagrams) } fn process_multiple_input<'a>( &mut self, dgrams: impl IntoIterator>, now: Instant, ) { self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display, { self.close(now, app_error, msg); } fn stats(&self) -> neqo_transport::Stats { self.transport_stats() } fn has_events(&self) -> bool { Provider::has_events(self) } } impl super::Handler for Handler { type Client = Http3Client; fn handle(&mut self, client: &mut Http3Client) -> Res { while let Some(event) = client.next_event() { match event { Http3ClientEvent::AuthenticationNeeded => { client.authenticated(AuthenticationStatus::Ok, Instant::now()); } Http3ClientEvent::HeaderReady { stream_id, headers, fin, .. } => { if self.url_handler.stream_handler(stream_id).is_some() { qdebug!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); } else { qwarn!("Data on unexpected stream: {stream_id}"); } if fin { self.url_handler.on_stream_fin(client, stream_id); } } Http3ClientEvent::DataReadable { stream_id } => { let mut stream_done = false; match self.url_handler.stream_handler(stream_id) { None => { qwarn!("Data on unexpected stream: {stream_id}"); } Some(handler) => loop { let (sz, fin) = client.read_data( Instant::now(), stream_id, &mut self.read_buffer, )?; handler.process_data_readable( stream_id, fin, &self.read_buffer[..sz], self.output_read_data, )?; if fin { stream_done = true; break; } if sz == 0 { break; } }, } if stream_done { self.url_handler.on_stream_fin(client, stream_id); } } Http3ClientEvent::DataWritable { stream_id } => { match self.url_handler.stream_handler(stream_id) { None => { qwarn!("Data on unexpected stream: {stream_id}"); } Some(handler) => { handler.process_data_writable(client, stream_id, Instant::now()); } } } Http3ClientEvent::StateChange(Http3State::Connected) | Http3ClientEvent::RequestsCreatable => { qinfo!("{event:?}"); self.url_handler.process_urls(client); } Http3ClientEvent::ZeroRttRejected => { qinfo!("{event:?}"); // All 0-RTT data was rejected. We need to retransmit it. self.url_handler.reinit(); self.url_handler.process_urls(client); } Http3ClientEvent::ResumptionToken(t) => self.token = Some(t), _ => { qwarn!("Unhandled event {event:?}"); } } } Ok(self.url_handler.done()) } fn take_token(&mut self) -> Option { self.token.take() } } trait StreamHandler { fn process_data_readable( &mut self, stream_id: StreamId, fin: bool, data: &[u8], output_read_data: bool, ) -> Res<()>; fn process_data_writable( &mut self, client: &mut Http3Client, stream_id: StreamId, now: Instant, ); } struct DownloadStreamHandler { out_file: Option>, } impl StreamHandler for DownloadStreamHandler { fn process_data_readable( &mut self, stream_id: StreamId, fin: bool, data: &[u8], output_read_data: bool, ) -> Res<()> { if let Some(out_file) = &mut self.out_file { if !data.is_empty() { out_file.write_all(data)?; } return Ok(()); } else if log::log_enabled!(log::Level::Debug) { if !output_read_data { qdebug!("READ[{stream_id}]: {} bytes", data.len()); } else if let Ok(txt) = std::str::from_utf8(data) { qdebug!("READ[{stream_id}]: {txt}"); } else { qdebug!("READ[{stream_id}]: 0x{}", hex(data)); } } if fin { self.out_file.take().map_or_else( || { qdebug!(""); Ok(()) }, |mut out_file| out_file.flush(), )?; } Ok(()) } fn process_data_writable( &mut self, _client: &mut Http3Client, _stream_id: StreamId, _now: Instant, ) { } } struct UploadStreamHandler { data: SendData, start: Instant, } impl StreamHandler for UploadStreamHandler { fn process_data_readable( &mut self, stream_id: StreamId, _fin: bool, data: &[u8], _output_read_data: bool, ) -> Res<()> { if let Ok(txt) = std::str::from_utf8(data) { let trimmed_txt = txt.trim_end_matches(char::from(0)); let parsed: usize = trimmed_txt.parse().map_err(|_| Error::InvalidInput)?; if parsed == self.data.len() { qinfo!( "Stream ID: {stream_id:?}, Upload time: {:?}", Instant::now().duration_since(self.start) ); } Ok(()) } else { qerror!("Unexpected data [{stream_id}]: 0x{}", hex(data)); Err(crate::client::Error::Http3(Error::InvalidInput)) } } fn process_data_writable( &mut self, client: &mut Http3Client, stream_id: StreamId, now: Instant, ) { match self .data .send(|chunk| client.send_data(stream_id, chunk, now)) { SendResult::StreamClosed => qwarn!("Stream {stream_id} is closed"), // Stream may be closed; ignore errors. SendResult::Done => _ = client.stream_close_send(stream_id, now), SendResult::MoreData => {} } } } struct UrlHandler { url_queue: VecDeque, handled_urls: Vec, stream_handlers: HashMap>, all_paths: Vec, args: Args, } impl UrlHandler { fn stream_handler(&mut self, stream_id: StreamId) -> Option<&mut Box> { self.stream_handlers.get_mut(&stream_id) } fn process_urls(&mut self, client: &mut Http3Client) { loop { if self.url_queue.is_empty() { break; } if self.stream_handlers.len() >= self.args.concurrency { break; } if !self.next_url(client) { break; } } } fn next_url(&mut self, client: &mut Http3Client) -> bool { let url = self .url_queue .pop_front() .expect("download_next called with empty queue"); let now = Instant::now(); match client.fetch( now, &self.args.method, &url, &self.args.headers, Priority::default(), ) { Ok(client_stream_id) => { qdebug!("Successfully created stream id {client_stream_id} for {url}"); let handler: Box = match self.args.method.as_str() { "GET" => { let out_file = get_output_file( &url, self.args.output_dir.as_ref(), &mut self.all_paths, ); _ = client.stream_close_send(client_stream_id, now); // Stream may be closed; ignore errors. Box::new(DownloadStreamHandler { out_file }) } "POST" => Box::new(UploadStreamHandler { data: SendData::zeroes(self.args.upload_size), start: now, }), _ => unimplemented!(), }; self.stream_handlers.insert(client_stream_id, handler); self.handled_urls.push(url); true } Err( Error::Transport(TransportError::StreamLimit) | Error::StreamLimit | Error::Unavailable, ) => { self.url_queue.push_front(url); false } Err(e) => { panic!("Can't create stream {e}"); } } } fn done(&self) -> bool { self.stream_handlers.is_empty() && self.url_queue.is_empty() } fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId) { self.stream_handlers.remove(&stream_id); self.process_urls(client); } fn reinit(&mut self) { for url in self.handled_urls.drain(..).rev() { self.url_queue.push_front(url); } self.stream_handlers.clear(); self.all_paths.clear(); } }