Skip to content

Commit

Permalink
Callbacks mostly working
Browse files Browse the repository at this point in the history
Silently hanging for some reason
  • Loading branch information
Sushisource committed Nov 28, 2024
1 parent e1d94b8 commit 0a9de76
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 37 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ obj/
/tests/golangworker/golangworker
/.vs
/.vscode
/.idea
/.idea
/.zed
Temporalio.sln.DotSettings.user
30 changes: 25 additions & 5 deletions src/Temporalio/Bridge/CustomSlotSupplier.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

namespace Temporalio.Bridge
Expand Down Expand Up @@ -28,15 +30,33 @@ internal unsafe CustomSlotSupplier(Temporalio.Worker.Tuning.ICustomSlotSupplier
PinCallbackHolder(interopCallbacks);
}

private void Reserve(Interop.SlotReserveCtx ctx)
private unsafe void Reserve(Interop.SlotReserveCtx ctx, void* sender)
{
// TODO: Need to call callback with result that will put it in a channel to await in Rust
var reserveTask = Task.Run(() => userSupplier.ReserveSlotAsync(new(ctx)));
SafeReserve(ctx, new IntPtr(sender));
}

private void TryReserve(Interop.SlotReserveCtx ctx)
private void SafeReserve(Interop.SlotReserveCtx ctx, IntPtr sender)
{
userSupplier.TryReserveSlot(new(ctx));
Console.WriteLine("Reserve called");
var reserveTask = Task.Run(async () =>
{
var permit = await userSupplier.ReserveSlotAsync(new(ctx)).ConfigureAwait(false);
Console.WriteLine("Reserve done user");
unsafe
{
Console.WriteLine("Calling async reserve??");
var handle = GCHandle.Alloc(permit, GCHandleType.Pinned);
Interop.Methods.complete_async_reserve(sender.ToPointer(), handle.AddrOfPinnedObject().ToPointer());
Console.WriteLine("Called async reserve??");
}
});
}

private unsafe void* TryReserve(Interop.SlotReserveCtx ctx)
{
var returned = userSupplier.TryReserveSlot(new(ctx));
var handle = GCHandle.Alloc(returned, GCHandleType.Pinned);
return handle.AddrOfPinnedObject().ToPointer();
}

private void MarkUsed(Interop.SlotMarkUsedCtx ctx)
Expand Down
18 changes: 11 additions & 7 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,11 @@ internal partial struct SlotReserveCtx
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal delegate void CustomReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);
internal unsafe delegate void CustomReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx, void* sender);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal delegate void CustomTryReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);
[return: NativeTypeName("const void *")]
internal unsafe delegate void* CustomTryReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);

internal enum SlotInfo_Tag
{
Expand Down Expand Up @@ -614,7 +615,7 @@ internal unsafe partial struct SlotInfo
{
public SlotInfo_Tag tag;

[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L419_C3")]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L422_C3")]
public _Anonymous_e__Union Anonymous;

internal ref WorkflowSlotInfo_Body workflow_slot_info
Expand Down Expand Up @@ -720,7 +721,7 @@ internal unsafe partial struct SlotSupplier
{
public SlotSupplier_Tag tag;

[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L465_C3")]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L468_C3")]
public _Anonymous_e__Union Anonymous;

internal ref FixedSizeSlotSupplier fixed_size
Expand Down Expand Up @@ -760,15 +761,15 @@ internal ref CustomSlotSupplierCallbacksImpl custom
internal unsafe partial struct _Anonymous_e__Union
{
[FieldOffset(0)]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L466_C5")]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L469_C5")]
public _Anonymous1_e__Struct Anonymous1;

[FieldOffset(0)]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L469_C5")]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L472_C5")]
public _Anonymous2_e__Struct Anonymous2;

[FieldOffset(0)]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L472_C5")]
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L475_C5")]
public _Anonymous3_e__Struct Anonymous3;

internal partial struct _Anonymous1_e__Struct
Expand Down Expand Up @@ -1057,5 +1058,8 @@ internal static unsafe partial class Methods
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct WorkerReplayPushResult")]
public static extern WorkerReplayPushResult worker_replay_push([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct WorkerReplayPusher *")] WorkerReplayPusher* worker_replay_pusher, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef workflow_id, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef history);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void complete_async_reserve(void* sender, [NativeTypeName("const void *")] void* permit);
}
}
9 changes: 7 additions & 2 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,12 @@ typedef struct SlotReserveCtx {
bool is_sticky;
} SlotReserveCtx;

typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx);
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx, void *sender);

typedef void (*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
/**
* Must return pointer to a C# object inheriting from SlotPermit
*/
typedef const void *(*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);

typedef enum SlotInfo_Tag {
WorkflowSlotInfo,
Expand Down Expand Up @@ -689,6 +692,8 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker,
struct ByteArrayRef workflow_id,
struct ByteArrayRef history);

void complete_async_reserve(void *sender, const void *permit);

#ifdef __cplusplus
} // extern "C"
#endif // __cplusplus
50 changes: 37 additions & 13 deletions src/Temporalio/Bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion;
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;

use std::collections::HashMap;
Expand Down Expand Up @@ -84,8 +85,20 @@ pub struct ResourceBasedSlotSupplier {
tuner_options: ResourceBasedTunerOptions,
}

type CustomReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
#[repr(C)]
pub struct CustomSlotSupplier<SK> {
inner: CustomSlotSupplierCallbacksImpl,
_pd: std::marker::PhantomData<SK>,
}

unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}

type CustomReserveSlotCallback =
unsafe extern "C" fn(ctx: SlotReserveCtx, sender: *mut libc::c_void);
/// Must return pointer to a C# object inheriting from SlotPermit
type CustomTryReserveSlotCallback =
unsafe extern "C" fn(ctx: SlotReserveCtx) -> *const libc::c_void;
type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: SlotMarkUsedCtx);
type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: SlotReleaseCtx);

Expand Down Expand Up @@ -113,15 +126,6 @@ impl CustomSlotSupplierCallbacksImpl {
}
}

#[repr(C)]
pub struct CustomSlotSupplier<SK> {
inner: CustomSlotSupplierCallbacksImpl,
_pd: std::marker::PhantomData<SK>,
}

unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}

#[repr(C)]
pub enum SlotKindType {
WorkflowSlotKindType,
Expand Down Expand Up @@ -173,11 +177,13 @@ impl<SK: SlotKind + Send + Sync> temporal_sdk_core_api::worker::SlotSupplier
type SlotKind = SK;

async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
let (tx, rx) = oneshot::channel();
let ctx = Self::convert_reserve_ctx(ctx);
let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
unsafe {
((*self.inner.0).reserve)(ctx);
((*self.inner.0).reserve)(ctx, tx);
}
unimplemented!()
rx.await.expect("reserve channel is not closed")
}

fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
Expand Down Expand Up @@ -727,6 +733,24 @@ pub extern "C" fn worker_replay_push(
}
}

#[no_mangle]
pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit: *const libc::c_void) {
dbg!("Trying to complete async reserve");
if !sender.is_null() {
dbg!("Completing async reserve");
unsafe {
let sender = Box::from_raw(sender as *mut Sender<SlotSupplierPermit>);
let permit =
SlotSupplierPermit::with_user_data(UserDataHandle(permit as *mut libc::c_void));
dbg!("sending");
let _ = sender.send(permit);
dbg!("Sent");
}
} else {
panic!("ReserveSlot sender must not be null!");
}
}

impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
type Error = anyhow::Error;

Expand Down
4 changes: 0 additions & 4 deletions src/Temporalio/Worker/Tuning/SlotPermit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ namespace Temporalio.Worker.Tuning
/// </remarks>
public class SlotPermit
{
private SlotPermit()
{
}

/// <summary>
/// Reconstruct a permit from a pointer.
/// </summary>
Expand Down
20 changes: 15 additions & 5 deletions tests/Temporalio.Tests/Worker/WorkerTuningTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,36 @@ public async Task Cannot_Mix_MaxConcurrent_And_Tuner()
Assert.Contains("Cannot set both Tuner and any of", argumentException.Message);
}

private class MyPermit : SlotPermit
{
private readonly int dat;

public MyPermit(int v)
{
this.dat = v;
}
}

private class MySlotSupplier : ICustomSlotSupplier
{
public Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx)
public async Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx)
{
throw new NotImplementedException();
// Do something async to make sure that works
await Task.Delay(10);
return new MyPermit(1);
}

public SlotPermit? TryReserveSlot(SlotReserveContext ctx)
{
throw new NotImplementedException();
return new MyPermit(1);
}

public void MarkSlotUsed(SlotMarkUsedContext ctx)
{
throw new NotImplementedException();
}

public void ReleaseSlot(SlotReleaseContext ctx)
{
throw new NotImplementedException();
}
}

Expand Down

0 comments on commit 0a9de76

Please sign in to comment.