Skip to content

Commit

Permalink
Get bridge data passed through
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 28, 2024
1 parent b992dd4 commit e1d94b8
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 411 deletions.
13 changes: 2 additions & 11 deletions src/Temporalio/Api/Activity/V1/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ static MessageReflection() {

}
#region Messages
[global::System.Diagnostics.DebuggerDisplayAttribute("{ToString(),nq}")]
public sealed partial class ActivityOptions : pb::IMessage<ActivityOptions>
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
, pb::IBufferMessage
Expand Down Expand Up @@ -396,11 +395,7 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand Down Expand Up @@ -457,11 +452,7 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down
28 changes: 14 additions & 14 deletions src/Temporalio/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 3 additions & 33 deletions src/Temporalio/Bridge/CustomMetricMeter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ namespace Temporalio.Bridge
/// <summary>
/// Core wrapper for a custom metric meter implementation.
/// </summary>
internal class CustomMetricMeter
internal class CustomMetricMeter : NativeInvokeableClass<Interop.CustomMetricMeter>
{
private readonly Temporalio.Runtime.ICustomMetricMeter meter;
private readonly Temporalio.Runtime.CustomMetricMeterOptions options;
private readonly List<GCHandle> handles = new();

/// <summary>
/// Initializes a new instance of the <see cref="CustomMetricMeter" /> class.
Expand All @@ -38,20 +37,9 @@ public unsafe CustomMetricMeter(
meter_free = FunctionPointer<Interop.CustomMetricMeterMeterFreeCallback>(Free),
};

// Pin the metric meter pointer and set it as the first handle
var interopMeterHandle = GCHandle.Alloc(interopMeter, GCHandleType.Pinned);
handles.Insert(0, interopMeterHandle);
Ptr = (Interop.CustomMetricMeter*)interopMeterHandle.AddrOfPinnedObject();

// Add handle for ourself
handles.Add(GCHandle.Alloc(this));
PinCallbackHolder(interopMeter);
}

/// <summary>
/// Gets the pointer to the native metric meter.
/// </summary>
internal unsafe Interop.CustomMetricMeter* Ptr { get; private init; }

private static unsafe string? GetStringOrNull(Interop.ByteArrayRef bytes) =>
(int)bytes.size == 0 ? null : GetString(bytes);

Expand Down Expand Up @@ -220,23 +208,5 @@ private unsafe void RecordMetricDuration(void* metric, ulong valueMs, void* attr
}

private unsafe void FreeAttributes(void* attributes) => GCHandle.FromIntPtr(new(attributes)).Free();

private unsafe void Free(Interop.CustomMetricMeter* meter)
{
// Free in order which frees function pointers first then meter handles
foreach (var handle in handles)
{
handle.Free();
}
}

// Similar to Scope.FunctionPointer
private IntPtr FunctionPointer<T>(T func)
where T : Delegate
{
var handle = GCHandle.Alloc(func);
handles.Add(handle);
return Marshal.GetFunctionPointerForDelegate(handle.Target!);
}
}
}
}
52 changes: 52 additions & 0 deletions src/Temporalio/Bridge/CustomSlotSupplier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Threading.Tasks;

namespace Temporalio.Bridge
{
/// <summary>
/// Core wrapper for a user-defined custom slot supplier.
/// </summary>
internal class CustomSlotSupplier : NativeInvokeableClass<Interop.CustomSlotSupplierCallbacks>
{
private readonly Temporalio.Worker.Tuning.ICustomSlotSupplier userSupplier;

/// <summary>
/// Initializes a new instance of the <see cref="CustomSlotSupplier" /> class.
/// </summary>
/// <param name="userSupplier">User's slot supplier implementation'.</param>
internal unsafe CustomSlotSupplier(Temporalio.Worker.Tuning.ICustomSlotSupplier userSupplier)
{
this.userSupplier = userSupplier;

var interopCallbacks = new Interop.CustomSlotSupplierCallbacks
{
reserve = FunctionPointer<Interop.CustomReserveSlotCallback>(Reserve),
try_reserve = FunctionPointer<Interop.CustomTryReserveSlotCallback>(TryReserve),
mark_used = FunctionPointer<Interop.CustomMarkSlotUsedCallback>(MarkUsed),
release = FunctionPointer<Interop.CustomReleaseSlotCallback>(Release),
};

PinCallbackHolder(interopCallbacks);
}

private void Reserve(Interop.SlotReserveCtx ctx)
{
// TODO: Need to call callback with result that will put it in a channel to await in Rust
var reserveTask = Task.Run(() => userSupplier.ReserveSlotAsync(new(ctx)));
}

private void TryReserve(Interop.SlotReserveCtx ctx)
{
userSupplier.TryReserveSlot(new(ctx));
}

private void MarkUsed(Interop.SlotMarkUsedCtx ctx)
{
userSupplier.MarkSlotUsed(new(ctx));
}

private void Release(Interop.SlotReleaseCtx ctx)
{
userSupplier.ReleaseSlot(new(ctx));
}
}
}
Loading

0 comments on commit e1d94b8

Please sign in to comment.