Skip to content

Commit

Permalink
uringlator: handle raced completions gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jan 19, 2025
1 parent e5dd487 commit 2aba39e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
25 changes: 22 additions & 3 deletions src/aio/Uringlator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@ const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArray
const log = std.log.scoped(.aio_uringlator);

pub const EventSource = @import("posix/posix.zig").EventSource;
const Result = struct { failure: Operation.Error, id: u16 };

const Result = struct {
failure: Operation.Error,
id: u16,

pub fn lessThan(_: void, a: @This(), b: @This()) bool {
return a.id < b.id;
}
};

ops: ItemPool(Operation.Union, u16),
prev_id: ?u16 = null, // for linking operations
Expand Down Expand Up @@ -199,11 +207,22 @@ pub fn complete(
ctx: Ctx,
completion_cb: fn (ctx: Ctx, id: u16, uop: *Operation.Union, failure: Operation.Error) void,
) aio.CompletionResult {
const finished = self.finished.swap();
var finished = self.finished.swap();
std.mem.sortUnstable(Result, finished[0..], {}, Result.lessThan);
var num_errors: u16 = 0;
for (finished) |res| {
completion: for (finished, 0..) |res, idx| {
// ignore raced completitions
if (idx > 0 and res.id == finished[idx - 1].id) continue;

var failure = res.failure;
if (self.ops.nodes[res.id].used == .link_timeout and failure != error.Canceled) {
for (finished) |res2| {
if (res2.id != res.id and self.next[res2.id] == res.id) {
// timeout raced with the linked operation
// the linked operation will finish this timeout instead
continue :completion;
}
}
var iter = self.ops.iterator();
const cres: enum { ok, not_found } = blk: {
while (iter.next()) |e| {
Expand Down
2 changes: 1 addition & 1 deletion src/minilib/fixed_array_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub fn DoubleBufferedFixedArrayList(T: type, SZ: type) type {
self.safe.reset();
}

pub fn swap(self: *@This()) []const T {
pub fn swap(self: *@This()) []T {
self.mutex.lock();
defer self.mutex.unlock();
defer self.safe.reset();
Expand Down

0 comments on commit 2aba39e

Please sign in to comment.