// pipe: anonymous SPSC byte pipe. // // One page per Pipe: header at offset 0, byte ring fills the rest // (PAGE_SIZE - sizeof(Pipe)). head/tail are monotone u64 byte // counters indexed modulo RING_CAP, so full vs. empty is // distinguishable without a reserved slot. Page lifetime is owned by // Pipe.refs, not mm.*_pages; unref() is the only path back to the // allocator. Single-producer / single-consumer per end. const builtin = #import("builtin") // Named module; see src/wait_queue.flash. const layout = #import("task_layout") const wq_mod = #import("wait_queue") pub const WaitQueue = wq_mod.WaitQueue pub const TaskStruct = layout.TaskStruct pub const FD_TABLE_SIZE = layout.FD_TABLE_SIZE pub const PAGE_SIZE u64 = 1 << 12 extern fn get_free_page() u64 extern fn free_page(p u64) void extern fn preempt_disable() void extern fn preempt_enable() void extern fn schedule() void // In the freestanding kernel build the page allocator hands out a // physical address; the kernel reads/writes the page through its // TTBR1 linear-map alias at `pa | LINEAR_MAP_BASE`. The host test // build allocates from a static buffer (tests/host_stubs.zig) and // returns a bare host VA — no alias, identity mapping. Branching at // comptime keeps the kernel path zero-overhead. const LINEAR_MAP_BASE u64 = 0xFFFF000000000000 inline fn pageKva(pa u64) u64 { return if (builtin.target.os.tag == .freestanding) pa | LINEAR_MAP_BASE else pa } pub const Pipe = extern struct { refs u32 = 0, _pad u32 = 0, head u64 = 0, tail u64 = 0, readers_wq WaitQueue = .{}, writers_wq WaitQueue = .{}, // Ring data follows in the same page; see ringBase(). pub fn count(self *Pipe) u64 { return self.head -% self.tail } pub fn isEmpty(self *Pipe) bool { return self.head == self.tail } pub fn isFull(self *Pipe) bool { return self.count() == RING_CAP } } pub const HEADER_SIZE u64 = #sizeOf(Pipe) pub const RING_CAP u64 = PAGE_SIZE - HEADER_SIZE inline fn ringBase(p *mut Pipe) [*]mut u8 { const base u64 = #intFromPtr(p) + HEADER_SIZE return #ptrFromInt(base) } // Allocate and zero a Pipe. Returns null on allocator failure. // refs starts at 0; the installer takes the first ref. pub fn alloc() ?*mut Pipe { const pa = get_free_page() if pa == 0 { return null } const kva = pageKva(pa) const p *mut Pipe = #ptrFromInt(kva) p.* = .{} return p } pub fn ref(p *mut Pipe) void { preempt_disable() p.refs += 1 preempt_enable() } // Drop one ref. On the last drop, wake both wait queues (woken tasks // observe refs == 0 on re-entry) and free the page. pub fn unref(p *mut Pipe) void { preempt_disable() p.refs -= 1 const last = p.refs == 0 preempt_enable() if !last { return } // Wake runs after the refs == 0 decision. No other ref exists, so // no concurrent reader or writer can race the free. p.readers_wq.wake_all() p.writers_wq.wake_all() const kva u64 = #intFromPtr(p) const pa u64 = if (builtin.target.os.tag == .freestanding) kva & ~LINEAR_MAP_BASE else kva free_page(pa) } // Block until a byte is available, then drain up to len bytes. // Returns 0 on EOF (refs <= 1 and empty: no writer can wake the // reader). Negative is reserved for future short-read errors. pub fn read(p *mut Pipe, buf [*]mut u8, len u64) i64 { var written u64 = 0 while written < len { p.readers_wq.prepare_to_wait() if p.isEmpty() { // Last-writer-closed EOF: caller's fd is the only ref. if p.refs <= 1 { p.readers_wq.finish_wait() break } schedule() continue } p.readers_wq.finish_wait() preempt_disable() const ring = ringBase(p) while written < len && !p.isEmpty() { buf[written] = ring[p.tail % RING_CAP] p.tail +%= 1 written += 1 } preempt_enable() p.writers_wq.wake_one() // One drain per call: short read is POSIX-conformant for pipes. break } p.readers_wq.finish_wait() return #intCast(written) } // Push bytes until `len` are written or the pipe loses all readers. // Returns the number of bytes pushed; negative is reserved. pub fn write(p *mut Pipe, buf [*]u8, len u64) i64 { var pushed u64 = 0 while pushed < len { p.writers_wq.prepare_to_wait() if p.isFull() { // Last reader closed. Short write of bytes pushed so far. // TODO: SIGPIPE / signal delivery not implemented. if p.refs <= 1 { p.writers_wq.finish_wait() break } schedule() continue } p.writers_wq.finish_wait() preempt_disable() const ring = ringBase(p) while pushed < len && !p.isFull() { ring[p.head % RING_CAP] = buf[pushed] p.head +%= 1 pushed += 1 } preempt_enable() p.readers_wq.wake_one() } p.writers_wq.finish_wait() return #intCast(pushed) } // ---- Host tests ---- const std = #import("std") test "empty pipe: isEmpty true, isFull false, count == 0" { const p = alloc() orelse return error.OutOfMemory p.refs = 1 try std.testing.expect(p.isEmpty()) try std.testing.expect(!p.isFull()) try std.testing.expectEqual(#as(u64, 0), p.count()) p.refs = 0 // Not calling unref — host stubs leak; bump-allocator doesn't recycle. } test "write then read round-trips bytes" { const p = alloc() orelse return error.OutOfMemory p.refs = 2 // two fds installed const payload = "hello-pipe" const n_w = write(p, payload.ptr, payload.len) try std.testing.expectEqual(#as(i64, payload.len), n_w) try std.testing.expectEqual(#as(u64, payload.len), p.count()) var buf [16]u8 = undefined const n_r = read(p, &buf, payload.len) try std.testing.expectEqual(#as(i64, payload.len), n_r) try std.testing.expectEqualSlices(u8, payload, buf[0..#intCast(n_r)]) try std.testing.expect(p.isEmpty()) } test "head/tail wraparound preserves byte order" { const p = alloc() orelse return error.OutOfMemory p.refs = 2 // Seed head/tail near wrap so the next write+read straddles modulo. p.head = RING_CAP - 4 p.tail = RING_CAP - 4 const payload = "ABCDEFGH" // 8 bytes — last 4 wrap to ring[0..4] _ = write(p, payload.ptr, payload.len) try std.testing.expectEqual(#as(u64, 8), p.count()) var buf [8]u8 = undefined _ = read(p, &buf, payload.len) try std.testing.expectEqualSlices(u8, payload, buf[0..]) } test "EOF: empty pipe with refs == 1 returns 0 instead of blocking" { const p = alloc() orelse return error.OutOfMemory p.refs = 1 // caller holds only the read end var buf [4]u8 = undefined const n = read(p, &buf, buf.len) try std.testing.expectEqual(#as(i64, 0), n) } test "isFull vs isEmpty mutually exclusive at boundaries" { const p = alloc() orelse return error.OutOfMemory p.refs = 2 // count == 0 → empty, not full. try std.testing.expect(p.isEmpty()) try std.testing.expect(!p.isFull()) // count == RING_CAP → full, not empty. p.head = RING_CAP p.tail = 0 try std.testing.expect(p.isFull()) try std.testing.expect(!p.isEmpty()) }