//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`] //! as underlying byte stream. This reader fully implements async/await so reading //! can use non-blocking I/O. use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf}; use crate::errors::{Error, Result, SyntaxError}; use crate::events::Event; use crate::name::{QName, ResolveResult}; use crate::parser::{ElementParser, Parser, PiParser}; use crate::reader::buffered_reader::impl_buffered_source; use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span}; use crate::utils::is_whitespace; /// A struct for read XML asynchronously from an [`AsyncBufRead`]. /// /// Having own struct allows us to implement anything without risk of name conflicts /// and does not suffer from the impossibility of having `async` in traits. struct TokioAdapter<'a, R>(&'a mut R); impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> { impl_buffered_source!('b, 0, async, await); } //////////////////////////////////////////////////////////////////////////////////////////////////// impl<'r, R> AsyncRead for BinaryStream<'r, R> where R: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { let start = buf.remaining(); let this = self.get_mut(); let poll = Pin::new(&mut *this.inner).poll_read(cx, buf); // If something was read, update offset if let Poll::Ready(Ok(_)) = poll { let amt = start - buf.remaining(); *this.offset += amt as u64; } poll } } impl<'r, R> AsyncBufRead for BinaryStream<'r, R> where R: AsyncBufRead + Unpin, { #[inline] fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx) } #[inline] fn consume(self: Pin<&mut Self>, amt: usize) { let this = self.get_mut(); this.inner.consume(amt); *this.offset += amt as u64; } } //////////////////////////////////////////////////////////////////////////////////////////////////// impl Reader { /// An asynchronous version of [`read_event_into()`]. Reads the next event into /// given buffer. /// /// This is the main entry point for reading XML `Event`s when using an async reader. /// /// See the documentation of [`read_event_into()`] for more information. /// /// # Examples /// /// ``` /// # tokio_test::block_on(async { /// # use pretty_assertions::assert_eq; /// use quick_xml::events::Event; /// use quick_xml::reader::Reader; /// /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered /// // reader instead of relying on the zero-copy optimizations for reading /// // from byte slices, which provides the sync interface anyway. /// let mut reader = Reader::from_reader(r#" /// /// Test /// Test 2 /// /// "#.as_bytes()); /// reader.config_mut().trim_text(true); /// /// let mut count = 0; /// let mut buf = Vec::new(); /// let mut txt = Vec::new(); /// loop { /// match reader.read_event_into_async(&mut buf).await { /// Ok(Event::Start(_)) => count += 1, /// Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()), /// Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e), /// Ok(Event::Eof) => break, /// _ => (), /// } /// buf.clear(); /// } /// assert_eq!(count, 3); /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); /// # }) // tokio_test::block_on /// ``` /// /// [`read_event_into()`]: Reader::read_event_into pub async fn read_event_into_async<'b>( &mut self, mut buf: &'b mut Vec, ) -> Result> { read_event_impl!( self, buf, TokioAdapter(&mut self.reader), read_until_close_async, await ) } /// An asynchronous version of [`read_to_end_into()`]. /// Reads asynchronously until end element is found using provided buffer as /// intermediate storage for events content. This function is supposed to be /// called after you already read a [`Start`] event. /// /// See the documentation of [`read_to_end_into()`] for more information. /// /// # Examples /// /// This example shows, how you can skip XML content after you read the /// start event. /// /// ``` /// # tokio_test::block_on(async { /// # use pretty_assertions::assert_eq; /// use quick_xml::events::{BytesStart, Event}; /// use quick_xml::reader::Reader; /// /// let mut reader = Reader::from_reader(r#" /// /// /// /// /// /// /// /// /// "#.as_bytes()); /// reader.config_mut().trim_text(true); /// let mut buf = Vec::new(); /// /// let start = BytesStart::new("outer"); /// let end = start.to_end().into_owned(); /// /// // First, we read a start event... /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start)); /// /// // ...then, we could skip all events to the corresponding end event. /// // This call will correctly handle nested elements. /// // Note, however, that this method does not handle namespaces. /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap(); /// /// // At the end we should get an Eof event, because we ate the whole XML /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof); /// # }) // tokio_test::block_on /// ``` /// /// [`read_to_end_into()`]: Self::read_to_end_into /// [`Start`]: Event::Start pub async fn read_to_end_into_async<'n>( &mut self, // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033` end: QName<'n>, buf: &mut Vec, ) -> Result { Ok(read_to_end!( self, end, buf, read_event_into_async, { buf.clear(); }, await )) } /// Private function to read until `>` is found. This function expects that /// it was called just after encounter a `<` symbol. async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec) -> Result> { read_until_close!(self, buf, TokioAdapter(&mut self.reader), await) } } //////////////////////////////////////////////////////////////////////////////////////////////////// impl NsReader { /// An asynchronous version of [`read_event_into()`]. Reads the next event into /// given buffer. /// /// This method manages namespaces but doesn't resolve them automatically. /// You should call [`resolve_element()`] if you want to get a namespace. /// /// You also can use [`read_resolved_event_into_async()`] instead if you want /// to resolve namespace as soon as you get an event. /// /// # Examples /// /// ``` /// # tokio_test::block_on(async { /// # use pretty_assertions::assert_eq; /// use quick_xml::events::Event; /// use quick_xml::name::{Namespace, ResolveResult::*}; /// use quick_xml::reader::NsReader; /// /// let mut reader = NsReader::from_reader(r#" /// /// Test /// Test 2 /// /// "#.as_bytes()); /// reader.config_mut().trim_text(true); /// /// let mut count = 0; /// let mut buf = Vec::new(); /// let mut txt = Vec::new(); /// loop { /// match reader.read_event_into_async(&mut buf).await.unwrap() { /// Event::Start(e) => { /// count += 1; /// let (ns, local) = reader.resolve_element(e.name()); /// match local.as_ref() { /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))), /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))), /// _ => unreachable!(), /// } /// } /// Event::Text(e) => { /// txt.push(e.unescape().unwrap().into_owned()) /// } /// Event::Eof => break, /// _ => (), /// } /// buf.clear(); /// } /// assert_eq!(count, 3); /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); /// # }) // tokio_test::block_on /// ``` /// /// [`read_event_into()`]: NsReader::read_event_into /// [`resolve_element()`]: Self::resolve_element /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec) -> Result> { self.pop(); let event = self.reader.read_event_into_async(buf).await; self.process_event(event) } /// An asynchronous version of [`read_to_end_into()`]. /// Reads asynchronously until end element is found using provided buffer as /// intermediate storage for events content. This function is supposed to be /// called after you already read a [`Start`] event. /// /// See the documentation of [`read_to_end_into()`] for more information. /// /// # Examples /// /// This example shows, how you can skip XML content after you read the /// start event. /// /// ``` /// # tokio_test::block_on(async { /// # use pretty_assertions::assert_eq; /// use quick_xml::name::{Namespace, ResolveResult}; /// use quick_xml::events::{BytesStart, Event}; /// use quick_xml::reader::NsReader; /// /// let mut reader = NsReader::from_reader(r#" /// /// /// /// /// /// /// /// /// /// /// /// /// "#.as_bytes()); /// reader.config_mut().trim_text(true); /// let mut buf = Vec::new(); /// /// let ns = Namespace(b"namespace 1"); /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5); /// let end = start.to_end().into_owned(); /// /// // First, we read a start event... /// assert_eq!( /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(), /// (ResolveResult::Bound(ns), Event::Start(start)) /// ); /// /// // ...then, we could skip all events to the corresponding end event. /// // This call will correctly handle nested elements. /// // Note, however, that this method does not handle namespaces. /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap(); /// /// // At the end we should get an Eof event, because we ate the whole XML /// assert_eq!( /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(), /// (ResolveResult::Unbound, Event::Eof) /// ); /// # }) // tokio_test::block_on /// ``` /// /// [`read_to_end_into()`]: Self::read_to_end_into /// [`Start`]: Event::Start pub async fn read_to_end_into_async<'n>( &mut self, // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033` end: QName<'n>, buf: &mut Vec, ) -> Result { // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should // match literally the start name. See `Config::check_end_names` documentation self.reader.read_to_end_into_async(end, buf).await } /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next /// event into given buffer asynchronously and resolves its namespace (if applicable). /// /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events. /// For all other events the concept of namespace is not defined, so /// a [`ResolveResult::Unbound`] is returned. /// /// If you are not interested in namespaces, you can use [`read_event_into_async()`] /// which will not automatically resolve namespaces for you. /// /// # Examples /// /// ``` /// # tokio_test::block_on(async { /// # use pretty_assertions::assert_eq; /// use quick_xml::events::Event; /// use quick_xml::name::{Namespace, QName, ResolveResult::*}; /// use quick_xml::reader::NsReader; /// /// let mut reader = NsReader::from_reader(r#" /// /// Test /// Test 2 /// /// "#.as_bytes()); /// reader.config_mut().trim_text(true); /// /// let mut count = 0; /// let mut buf = Vec::new(); /// let mut txt = Vec::new(); /// loop { /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() { /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => { /// count += 1; /// assert_eq!(e.local_name(), QName(b"tag1").into()); /// } /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => { /// count += 1; /// assert_eq!(e.local_name(), QName(b"tag2").into()); /// } /// (_, Event::Start(_)) => unreachable!(), /// /// (_, Event::Text(e)) => { /// txt.push(e.unescape().unwrap().into_owned()) /// } /// (_, Event::Eof) => break, /// _ => (), /// } /// buf.clear(); /// } /// assert_eq!(count, 3); /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]); /// # }) // tokio_test::block_on /// ``` /// /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into /// [`Start`]: Event::Start /// [`Empty`]: Event::Empty /// [`End`]: Event::End /// [`read_event_into_async()`]: Self::read_event_into_async pub async fn read_resolved_event_into_async<'ns, 'b>( // Name 'ns lifetime, because otherwise we get an error // "implicit elided lifetime not allowed here" on ResolveResult &'ns mut self, buf: &'b mut Vec, ) -> Result<(ResolveResult<'ns>, Event<'b>)> { let event = self.read_event_into_async(buf).await; self.resolve_event(event) } } #[cfg(test)] mod test { use super::TokioAdapter; use crate::reader::test::check; check!( #[tokio::test] read_event_into_async, read_until_close_async, TokioAdapter, &mut Vec::new(), async, await ); #[test] fn test_future_is_send() { // This test should just compile, no actual runtime checks are performed here. use super::*; use tokio::io::BufReader; fn check_send(_: T) {} let input = vec![]; let mut reading_buf = vec![]; let mut reader = Reader::from_reader(BufReader::new(input.as_slice())); check_send(reader.read_event_into_async(&mut reading_buf)); } }