Skip to content

Commit

Permalink
use file-scoped namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
fealebenpae committed Aug 15, 2023
1 parent 104da2c commit bf2af73
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 438 deletions.
155 changes: 77 additions & 78 deletions Realm/Realm/Native/SyncSocketProvider.EventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,123 +23,122 @@
using System.Threading.Tasks;
using Realms.Logging;

namespace Realms.Native
namespace Realms.Native;

internal partial class SyncSocketProvider
{
internal partial class SyncSocketProvider
private class Timer
{
private class Timer
{
private readonly CancellationTokenSource _cts = new();

internal Timer(TimeSpan delay, IntPtr nativeCallback, ChannelWriter<IWork> workQueue)
{
Logger.LogDefault(LogLevel.Trace, $"Creating timer with delay {delay} and target {nativeCallback}.");
var cancellationToken = _cts.Token;
Task.Delay(delay, cancellationToken).ContinueWith(async _ =>
{
await workQueue.WriteAsync(new Work(nativeCallback, cancellationToken));
});
}
private readonly CancellationTokenSource _cts = new();

internal void Cancel()
{
Logger.LogDefault(LogLevel.Trace, $"Canceling timer.");
_cts.Cancel();
_cts.Dispose();
}

private class Work : IWork
internal Timer(TimeSpan delay, IntPtr nativeCallback, ChannelWriter<IWork> workQueue)
{
Logger.LogDefault(LogLevel.Trace, $"Creating timer with delay {delay} and target {nativeCallback}.");
var cancellationToken = _cts.Token;
Task.Delay(delay, cancellationToken).ContinueWith(async _ =>
{
private readonly IntPtr _nativeCallback;
private readonly CancellationToken _cancellationToken;

public Work(IntPtr nativeCallback, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_cancellationToken = cancellationToken;
}

public void Execute()
{
var status = Status.OK;
if (_cancellationToken.IsCancellationRequested)
{
status = new(ErrorCode.OperationAborted, "Timer canceled");
}
await workQueue.WriteAsync(new Work(nativeCallback, cancellationToken));
});
}

RunCallback(_nativeCallback, status);
}
}
internal void Cancel()
{
Logger.LogDefault(LogLevel.Trace, $"Canceling timer.");
_cts.Cancel();
_cts.Dispose();
}

private class EventLoopWork : IWork
private class Work : IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private readonly CancellationToken _cancellationToken;

public EventLoopWork(IntPtr nativeCallback, Status status, CancellationToken cancellationToken)
public Work(IntPtr nativeCallback, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_status = status;
_cancellationToken = cancellationToken;
}

public void Execute()
{
var status = Status.OK;
if (_cancellationToken.IsCancellationRequested)
{
Logger.LogDefault(LogLevel.Trace, "Deleting EventLoopWork callback only because event loop was cancelled.");
NativeMethods.delete_callback(_nativeCallback);
return;
status = new(ErrorCode.OperationAborted, "Timer canceled");
}

RunCallback(_nativeCallback, _status);
RunCallback(_nativeCallback, status);
}
}
}

private static void RunCallback(IntPtr nativeCallback, Status status)
{
Logger.LogDefault(LogLevel.Trace, $"SyncSocketProvider running native callback {nativeCallback} with status {status.Code} \"{status.Reason}\".");
private class EventLoopWork : IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private readonly CancellationToken _cancellationToken;

using var arena = new Arena();
NativeMethods.run_callback(nativeCallback, status.Code, StringValue.AllocateFrom(status.Reason, arena));
public EventLoopWork(IntPtr nativeCallback, Status status, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_status = status;
_cancellationToken = cancellationToken;
}

private async Task PostWorkAsync(IntPtr nativeCallback)
public void Execute()
{
Logger.LogDefault(LogLevel.Trace, "Posting work to SyncSocketProvider event loop.");
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, Status.OK, _cts.Token));
if (_cancellationToken.IsCancellationRequested)
{
Logger.LogDefault(LogLevel.Trace, "Deleting EventLoopWork callback only because event loop was cancelled.");
NativeMethods.delete_callback(_nativeCallback);
return;
}

RunCallback(_nativeCallback, _status);
}
}

private static void RunCallback(IntPtr nativeCallback, Status status)
{
Logger.LogDefault(LogLevel.Trace, $"SyncSocketProvider running native callback {nativeCallback} with status {status.Code} \"{status.Reason}\".");

using var arena = new Arena();
NativeMethods.run_callback(nativeCallback, status.Code, StringValue.AllocateFrom(status.Reason, arena));
}

private async partial Task WorkThread()
private async Task PostWorkAsync(IntPtr nativeCallback)
{
Logger.LogDefault(LogLevel.Trace, "Posting work to SyncSocketProvider event loop.");
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, Status.OK, _cts.Token));
}

private async partial Task WorkThread()
{
Logger.LogDefault(LogLevel.Trace, "Starting SyncSocketProvider event loop.");
try
{
Logger.LogDefault(LogLevel.Trace, "Starting SyncSocketProvider event loop.");
try
while (await _workQueue.Reader.WaitToReadAsync())
{
while (await _workQueue.Reader.WaitToReadAsync())
while (_workQueue.Reader.TryRead(out var work))
{
while (_workQueue.Reader.TryRead(out var work))
{
work.Execute();
}
work.Execute();
}
}
catch (Exception e)
}
catch (Exception e)
{
Logger.LogDefault(LogLevel.Error, $"Error occurred in SyncSocketProvider event loop {e.GetType().FullName}: {e.Message}");
if (!string.IsNullOrEmpty(e.StackTrace))
{
Logger.LogDefault(LogLevel.Error, $"Error occurred in SyncSocketProvider event loop {e.GetType().FullName}: {e.Message}");
if (!string.IsNullOrEmpty(e.StackTrace))
{
Logger.LogDefault(LogLevel.Trace, e.StackTrace);
}

throw;
Logger.LogDefault(LogLevel.Trace, e.StackTrace);
}

Logger.LogDefault(LogLevel.Trace, "Exiting SyncSocketProvider event loop.");
throw;
}

Logger.LogDefault(LogLevel.Trace, "Exiting SyncSocketProvider event loop.");
}
}
107 changes: 53 additions & 54 deletions Realm/Realm/Native/SyncSocketProvider.Native.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,80 +20,79 @@
using System.Net.WebSockets;
using System.Runtime.InteropServices;

namespace Realms.Native
namespace Realms.Native;

internal partial class SyncSocketProvider
{
internal partial class SyncSocketProvider
{
// additional websocket close status codes that Sync understands
private const int RLM_ERR_WEBSOCKET_CONNECTION_FAILED = 4401;
private const int RLM_ERR_WEBSOCKET_READ_ERROR = 4402;
private const int RLM_ERR_WEBSOCKET_WRITE_ERROR = 4403;
// additional websocket close status codes that Sync understands
private const int RLM_ERR_WEBSOCKET_CONNECTION_FAILED = 4401;
private const int RLM_ERR_WEBSOCKET_READ_ERROR = 4402;
private const int RLM_ERR_WEBSOCKET_WRITE_ERROR = 4403;

// equivalent to ErrorCodes::Error in <realm/error_codes.hpp>
public enum ErrorCode : int
{
Ok = 0,
RuntimeError = 1000,
OperationAborted = 1027
}
// equivalent to ErrorCodes::Error in <realm/error_codes.hpp>
public enum ErrorCode : int
{
Ok = 0,
RuntimeError = 1000,
OperationAborted = 1027
}

[StructLayout(LayoutKind.Sequential)]
public readonly struct Endpoint
{
public readonly StringValue address;
[StructLayout(LayoutKind.Sequential)]
public readonly struct Endpoint
{
public readonly StringValue address;

public readonly ushort port;
public readonly ushort port;

public readonly StringValue path;
public readonly StringValue path;

public readonly MarshaledVector<StringValue> protocols;
public readonly MarshaledVector<StringValue> protocols;

public readonly NativeBool is_ssl;
}
public readonly NativeBool is_ssl;
}

private static class NativeMethods
{
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void post_work(IntPtr socket_provider, IntPtr native_callback);
private static class NativeMethods
{
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void post_work(IntPtr socket_provider, IntPtr native_callback);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void provider_dispose(IntPtr managed_provider);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void provider_dispose(IntPtr managed_provider);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate IntPtr create_timer(IntPtr socket_provider, UInt64 delay_miliseconds, IntPtr native_callback);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate IntPtr create_timer(IntPtr socket_provider, UInt64 delay_miliseconds, IntPtr native_callback);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void cancel_timer(IntPtr timer);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void cancel_timer(IntPtr timer);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate IntPtr websocket_connect(IntPtr socket_provider, IntPtr observer, Endpoint endpoint);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate IntPtr websocket_connect(IntPtr socket_provider, IntPtr observer, Endpoint endpoint);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void websocket_write(IntPtr managed_websocket, BinaryValue data, IntPtr native_callback);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void websocket_write(IntPtr managed_websocket, BinaryValue data, IntPtr native_callback);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void websocket_close(IntPtr managed_websocket);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void websocket_close(IntPtr managed_websocket);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_install_callbacks", CallingConvention = CallingConvention.Cdecl)]
public static extern void install_callbacks(post_work post, provider_dispose dispose, create_timer create_timer, cancel_timer cancel_timer, websocket_connect connect, websocket_write write, websocket_close close);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_install_callbacks", CallingConvention = CallingConvention.Cdecl)]
public static extern void install_callbacks(post_work post, provider_dispose dispose, create_timer create_timer, cancel_timer cancel_timer, websocket_connect connect, websocket_write write, websocket_close close);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_run_callback", CallingConvention = CallingConvention.Cdecl)]
public static extern void run_callback(IntPtr native_callback, ErrorCode result, StringValue reason);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_run_callback", CallingConvention = CallingConvention.Cdecl)]
public static extern void run_callback(IntPtr native_callback, ErrorCode result, StringValue reason);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_delete_callback", CallingConvention = CallingConvention.Cdecl)]
public static extern void delete_callback(IntPtr native_callback);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_delete_callback", CallingConvention = CallingConvention.Cdecl)]
public static extern void delete_callback(IntPtr native_callback);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_connected_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_connected_handler(IntPtr observer, StringValue protocol);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_connected_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_connected_handler(IntPtr observer, StringValue protocol);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_error_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_error_handler(IntPtr observer);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_error_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_error_handler(IntPtr observer);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_binary_message_received", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_binary_message_received(IntPtr observer, BinaryValue data);
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_binary_message_received", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_binary_message_received(IntPtr observer, BinaryValue data);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_closed_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_closed_handler(IntPtr observer, NativeBool was_clean, WebSocketCloseStatus status, StringValue reason);
}
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_websocket_observer_closed_handler", CallingConvention = CallingConvention.Cdecl)]
public static extern void observer_closed_handler(IntPtr observer, NativeBool was_clean, WebSocketCloseStatus status, StringValue reason);
}
}
Loading

0 comments on commit bf2af73

Please sign in to comment.