//! Tests for AsyncRead on tokio::fs::File via io-uring. #![cfg(all( tokio_unstable, feature = "io-uring", feature = "rt", feature = "fs", target_os = "linux" ))] use std::io::Write; use std::path::PathBuf; use std::sync::mpsc; use std::time::Duration; use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::runtime::{Builder, Runtime}; use tokio_util::task::TaskTracker; fn multi_rt(n: usize) -> Box Runtime> { Box::new(move || { Builder::new_multi_thread() .worker_threads(n) .enable_all() .build() .unwrap() }) } fn current_rt() -> Box Runtime> { Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) } fn rt_combinations() -> Vec Runtime>> { vec![current_rt(), multi_rt(1), multi_rt(4), multi_rt(64)] } fn create_temp_file(data: &[u8]) -> (NamedTempFile, PathBuf) { let mut tmp = NamedTempFile::new().unwrap(); tmp.write_all(data).unwrap(); tmp.flush().unwrap(); let path = tmp.path().to_path_buf(); (tmp, path) } #[tokio::test] async fn test_file_read() { let data = b"hello io-uring"; let (_tmp, path) = create_temp_file(data); let mut file = File::open(&path).await.unwrap(); let mut buf = vec![0u8; data.len()]; let n = file.read(&mut buf).await.unwrap(); assert_eq!(n, data.len()); assert_eq!(&buf[..n], data); } #[tokio::test] async fn test_file_read_exact() { let data = b"exact read test data"; let (_tmp, path) = create_temp_file(data); let mut file = File::open(&path).await.unwrap(); let mut buf = vec![0u8; data.len()]; file.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, data); } #[tokio::test] async fn test_file_read_to_end() { // Empty file { let (_tmp, path) = create_temp_file(b""); let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); let n = file.read_to_end(&mut buf).await.unwrap(); assert_eq!(n, 0); assert!(buf.is_empty()); } // Small file { let data: Vec = (0..100u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, data); } // File larger than typical buffer { let data: Vec = (0..100_000u32).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, data); } } #[tokio::test] async fn test_file_read_to_string() { let text = "hello, io-uring world! 🦀"; let (_tmp, path) = create_temp_file(text.as_bytes()); let mut file = File::open(&path).await.unwrap(); let mut s = String::new(); file.read_to_string(&mut s).await.unwrap(); assert_eq!(s, text); } #[tokio::test] async fn test_file_read_empty() { let (_tmp, path) = create_temp_file(b""); let mut file = File::open(&path).await.unwrap(); let mut buf = vec![0u8; 100]; let n = file.read(&mut buf).await.unwrap(); assert_eq!(n, 0); } #[tokio::test] async fn test_file_read_large() { let data: Vec = (0..3_000_000u32).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf.len(), data.len()); assert_eq!(buf, data); } #[tokio::test] async fn test_file_read_custom_buf_size() { let data: Vec = (0..1000u16).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); file.set_max_buf_size(64); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, data); } #[tokio::test] async fn test_file_read_partial_buf() { let data: Vec = (0..100u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); // Read only 5 bytes let mut buf = vec![0u8; 5]; file.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, &data[..5]); // Read the rest let mut rest = Vec::new(); file.read_to_end(&mut rest).await.unwrap(); assert_eq!(rest, data[5..]); } #[tokio::test] async fn test_file_read_seek() { let data = b"hello world"; let (_tmp, path) = create_temp_file(data); let mut file = File::open(&path).await.unwrap(); file.seek(std::io::SeekFrom::Start(6)).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"world"); } #[tokio::test] async fn test_file_read_seek_back() { let data = b"hello world"; let (_tmp, path) = create_temp_file(data); let mut file = File::open(&path).await.unwrap(); // Read first 5 bytes let mut buf = vec![0u8; 5]; file.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"hello"); // Seek back to start file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); // Read full content let mut full = Vec::new(); file.read_to_end(&mut full).await.unwrap(); assert_eq!(full, data); } #[tokio::test] async fn test_file_read_after_write() { let tmp = NamedTempFile::new().unwrap(); let path = tmp.path().to_path_buf(); let mut file = tokio::fs::OpenOptions::new() .read(true) .write(true) .open(&path) .await .unwrap(); file.write_all(b"hello uring").await.unwrap(); file.flush().await.unwrap(); file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello uring"); } #[tokio::test] async fn test_file_read_cancel() { let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let path2 = path.clone(); let handle = tokio::spawn(async move { let mut file = File::open(&path2).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); buf }); handle.abort(); let res = handle.await; assert!(res.unwrap_err().is_cancelled()); // Verify runtime still works let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, data); } #[tokio::test] async fn test_file_read_concurrent() { const NUM_FILES: usize = 100; let files: Vec<_> = (0..NUM_FILES) .map(|i| { let data: Vec = (0..1024).map(|j| ((i as u16 + j) % 256) as u8).collect(); create_temp_file(&data) }) .collect(); let tracker = TaskTracker::new(); for (i, (_tmp, path)) in files.iter().enumerate() { let path = path.clone(); let expected: Vec = (0..1024).map(|j| ((i as u16 + j) % 256) as u8).collect(); tracker.spawn(async move { let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, expected); }); } tracker.close(); tracker.wait().await; } #[test] fn test_file_read_multi_runtime() { for rt_factory in rt_combinations() { let rt = rt_factory(); let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let result = rt.block_on(async { let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); buf }); assert_eq!(result, data); } } #[test] fn shutdown_runtime_with_pending_reads() { for rt_factory in rt_combinations() { let rt = rt_factory(); let (done_tx, done_rx) = mpsc::channel(); let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); for _ in 0..50 { let path = path.clone(); rt.spawn(async move { let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); let _ = file.read_to_end(&mut buf).await; }); } std::thread::spawn(move || { rt.shutdown_timeout(Duration::from_millis(500)); done_tx.send(()).unwrap(); }); done_rx.recv().unwrap(); } } #[test] fn test_file_read_from_std() { let mut tmp = NamedTempFile::new().unwrap(); let data = b"hello from_std io-uring"; tmp.write_all(data).unwrap(); tmp.flush().unwrap(); let path = tmp.path().to_owned(); let rt = Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() .unwrap(); rt.block_on(async { let std_file = std::fs::File::open(&path).unwrap(); let mut file = File::from_std(std_file); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, data); }); } /// Read with a buffer smaller than the file content. Verifies internal /// buffering serves subsequent small reads without issuing new underlying read /// operations. #[tokio::test] async fn test_file_read_with_smaller_buf() { let data: Vec = (0..1024u16).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); // triggers an underlying read that fills the internal buffer with more data // than we consume here let mut buf = vec![0u8; 4]; let n = file.read(&mut buf).await.unwrap(); assert_eq!(n, 4); assert_eq!(&buf, &data[..4]); // Second read: still smaller than what's buffered internally let mut buf = vec![0u8; 32]; let n = file.read(&mut buf).await.unwrap(); assert!(n > 0); assert_eq!(&buf[..n], &data[4..4 + n]); // Read the rest let mut rest = Vec::new(); file.read_to_end(&mut rest).await.unwrap(); let total = 4 + n + rest.len(); assert_eq!(total, data.len()); } #[tokio::test] async fn test_file_read_with_bigger_buf() { let data = b"hello io-uring"; let (_tmp, path) = create_temp_file(data); let mut file = File::open(&path).await.unwrap(); // Read with buffer larger than file's contents let mut buf = vec![0u8; 1024]; let n = file.read(&mut buf).await.unwrap(); assert!(n > 0 && n <= data.len()); assert_eq!(&buf[..n], &data[..n]); if n < data.len() { let mut rest = vec![0u8; 1024]; let n2 = file.read(&mut rest).await.unwrap(); assert_eq!(&rest[..n2], &data[n..n + n2]); } } /// Read a file larger than DEFAULT_MAX_BUF_SIZE (2 MiB). Verifies that /// chunked reads across multiple underlying operations produce correct data. #[tokio::test] async fn test_file_read_buffer_larger_than_max() { // 4 MiB + 1000 bytes to cross multiple chunk boundaries. let size = (4 << 20) + 1000; let data: Vec = (0..size).map(|i| (i % 256) as u8).collect(); let (_tmp, path) = create_temp_file(&data); let mut file = File::open(&path).await.unwrap(); let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf.len(), data.len()); assert_eq!(buf, data); } /// Read some bytes from a file, then write, verifying the implicit seek-back /// works correctly. #[tokio::test] async fn test_file_read_then_write() { let original = b"hello world, io-uring!"; let (_tmp, path) = create_temp_file(original); let mut file = tokio::fs::OpenOptions::new() .read(true) .write(true) .open(&path) .await .unwrap(); let mut buf = vec![0u8; 5]; file.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"hello"); // Write at the current position file.write_all(b" REPLACED").await.unwrap(); file.flush().await.unwrap(); // Re-read the full file and verify the write landed correctly file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); let mut result = Vec::new(); file.read_to_end(&mut result).await.unwrap(); assert_eq!(&result[..5], b"hello"); assert_eq!(&result[5..14], b" REPLACED"); } /// Partial read followed by write at a different position. Verifies the /// seek-back accounts for partially consumed internal buffer. #[tokio::test] async fn test_file_partial_read_then_write() { let data = b"abcdefghijklmnopqrstuvwxyz"; let (_tmp, path) = create_temp_file(data); let mut file = tokio::fs::OpenOptions::new() .read(true) .write(true) .open(&path) .await .unwrap(); // First read fills internal buffer. let mut buf = vec![0u8; 10]; file.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"abcdefghij"); // Second read served from internal buffer let mut buf2 = vec![0u8; 3]; file.read_exact(&mut buf2).await.unwrap(); assert_eq!(&buf2, b"klm"); // Write should seek back past unconsumed buffered bytes, then write file.write_all(b"NOP").await.unwrap(); file.flush().await.unwrap(); file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); let mut result = Vec::new(); file.read_to_end(&mut result).await.unwrap(); assert_eq!(&result[..13], b"abcdefghijklm"); assert_eq!(&result[13..16], b"NOP"); assert_eq!(&result[16..], &data[16..]); }