Skip to content

Commit

Permalink
Protobuf copiers/codecs for data types (#8359)
Browse files Browse the repository at this point in the history
* Add copiers for non-IMessage protobuf types

* Add tests

* Some code doc cleanup

* Add codecs as well

* Add codec tests
  • Loading branch information
jsteinich authored Apr 11, 2023
1 parent 20cf814 commit 9e85ac7
Show file tree
Hide file tree
Showing 7 changed files with 563 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/ByteStringCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using Google.Protobuf;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.WireProtocol;

namespace Orleans.Serialization;

/// <summary>
/// Serializer for <see cref="ByteString"/>.
/// </summary>
[RegisterSerializer]
public sealed class ByteStringCodec : IFieldCodec<ByteString>
{
/// <inheritdoc/>
ByteString IFieldCodec<ByteString>.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.WireType == WireType.Reference)
{
return ReferenceCodec.ReadReference<ByteString, TInput>(ref reader, field);
}

field.EnsureWireType(WireType.LengthPrefixed);
var length = reader.ReadVarUInt32();
var result = UnsafeByteOperations.UnsafeWrap(reader.ReadBytes(length));
ReferenceCodec.RecordObject(reader.Session, result);
return result;
}

/// <inheritdoc/>
void IFieldCodec<ByteString>.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, ByteString value)
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

writer.WriteFieldHeader(fieldIdDelta, expectedType, typeof(ByteString), WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)value.Length);
writer.Write(value.Span);
}
}
24 changes: 24 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/ByteStringCopier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Google.Protobuf;
using Orleans.Serialization.Cloning;

namespace Orleans.Serialization;

/// <summary>
/// Copier for <see cref="ByteString"/>.
/// </summary>
[RegisterCopier]
public sealed class ByteStringCopier : IDeepCopier<ByteString>
{
/// <inheritdoc/>
public ByteString DeepCopy(ByteString input, CopyContext context)
{
if (context.TryGetCopy<ByteString>(input, out var result))
{
return result;
}

result = ByteString.CopyFrom(input.Span);
context.RecordCopy(input, result);
return result;
}
}
135 changes: 135 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/MapFieldCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System;
using System.Buffers;
using Google.Protobuf.Collections;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.GeneratedCodeHelpers;
using Orleans.Serialization.Session;
using Orleans.Serialization.WireProtocol;

namespace Orleans.Serialization;

/// <summary>
/// Serializer for <see cref="MapField{TKey,TValue}"/>.
/// </summary>
/// <typeparam name="TKey">The key type.</typeparam>
/// <typeparam name="TValue">The value type.</typeparam>
[RegisterSerializer]
public sealed class MapFieldCodec<TKey, TValue> : IFieldCodec<MapField<TKey, TValue>>
{
private readonly Type _keyFieldType = typeof(TKey);
private readonly Type _valueFieldType = typeof(TValue);

private readonly IFieldCodec<TKey> _keyCodec;
private readonly IFieldCodec<TValue> _valueCodec;

/// <summary>
/// Initializes a new instance of the <see cref="MapFieldCodec{TKey, TValue}"/> class.
/// </summary>
/// <param name="keyCodec">The key codec.</param>
/// <param name="valueCodec">The value codec.</param>
public MapFieldCodec(
IFieldCodec<TKey> keyCodec,
IFieldCodec<TValue> valueCodec)
{
_keyCodec = OrleansGeneratedCodeHelper.UnwrapService(this, keyCodec);
_valueCodec = OrleansGeneratedCodeHelper.UnwrapService(this, valueCodec);
}

/// <inheritdoc/>
public void WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, MapField<TKey, TValue> value) where TBufferWriter : IBufferWriter<byte>
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

writer.WriteFieldHeader(fieldIdDelta, expectedType, value.GetType(), WireType.TagDelimited);

if (value.Count > 0)
{
UInt32Codec.WriteField(ref writer, 0, (uint)value.Count);
uint innerFieldIdDelta = 1;
foreach (var element in value)
{
_keyCodec.WriteField(ref writer, innerFieldIdDelta, _keyFieldType, element.Key);
_valueCodec.WriteField(ref writer, 0, _valueFieldType, element.Value);
innerFieldIdDelta = 0;
}
}

writer.WriteEndObject();
}

/// <inheritdoc/>
public MapField<TKey, TValue> ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.WireType == WireType.Reference)
{
return ReferenceCodec.ReadReference<MapField<TKey, TValue>, TInput>(ref reader, field);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
TKey key = default;
var valueExpected = false;
MapField<TKey, TValue> result = null;
uint fieldId = 0;
while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
var length = (int)UInt32Codec.ReadValue(ref reader, header);
if (length > 10240 && length > reader.Length)
{
ThrowInvalidSizeException(length);
}

result = CreateInstance(reader.Session, placeholderReferenceId);
break;
case 1:
if (result is null)
ThrowLengthFieldMissing();

if (!valueExpected)
{
key = _keyCodec.ReadValue(ref reader, header);
valueExpected = true;
}
else
{
result.Add(key, _valueCodec.ReadValue(ref reader, header));
valueExpected = false;
}
break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

result ??= CreateInstance(reader.Session, placeholderReferenceId);
return result;
}

private static MapField<TKey, TValue> CreateInstance(SerializerSession session, uint placeholderReferenceId)
{
var result = new MapField<TKey, TValue>();
ReferenceCodec.RecordObject(session, result, placeholderReferenceId);
return result;
}

private static void ThrowInvalidSizeException(int length) => throw new IndexOutOfRangeException(
$"Declared length of {typeof(MapField<TKey, TValue>)}, {length}, is greater than total length of input.");

private static void ThrowLengthFieldMissing() => throw new RequiredFieldMissingException("Serialized MapField is missing its length field.");
}
59 changes: 59 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/MapFieldCopier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using Google.Protobuf.Collections;
using Orleans.Serialization.Cloning;

namespace Orleans.Serialization;

/// <summary>
/// Copier for <see cref="MapField{TKey, TValue}"/>.
/// </summary>
/// <typeparam name="TKey">The type of the t key.</typeparam>
/// <typeparam name="TValue">The type of the t value.</typeparam>
[RegisterCopier]
public sealed class MapFieldCopier<TKey, TValue> : IDeepCopier<MapField<TKey, TValue>>, IBaseCopier<MapField<TKey, TValue>>
{
private readonly IDeepCopier<TKey> _keyCopier;
private readonly IDeepCopier<TValue> _valueCopier;

/// <summary>
/// Initializes a new instance of the <see cref="MapFieldCopier{TKey, TValue}"/> class.
/// </summary>
/// <param name="keyCopier">The key copier.</param>
/// <param name="valueCopier">The value copier.</param>
public MapFieldCopier(IDeepCopier<TKey> keyCopier, IDeepCopier<TValue> valueCopier)
{
_keyCopier = keyCopier;
_valueCopier = valueCopier;
}

/// <inheritdoc/>
public MapField<TKey, TValue> DeepCopy(MapField<TKey, TValue> input, CopyContext context)
{
if (context.TryGetCopy<MapField<TKey, TValue>>(input, out var result))
{
return result;
}

if (input.GetType() != typeof(MapField<TKey, TValue>))
{
return context.DeepCopy(input);
}

result = new MapField<TKey, TValue>();
context.RecordCopy(input, result);
foreach (var pair in input)
{
result[_keyCopier.DeepCopy(pair.Key, context)] = _valueCopier.DeepCopy(pair.Value, context);
}

return result;
}

/// <inheritdoc/>
public void DeepCopy(MapField<TKey, TValue> input, MapField<TKey, TValue> output, CopyContext context)
{
foreach (var pair in input)
{
output[_keyCopier.DeepCopy(pair.Key, context)] = _valueCopier.DeepCopy(pair.Value, context);
}
}
}
118 changes: 118 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/RepeatedFieldCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using Google.Protobuf.Collections;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.GeneratedCodeHelpers;
using Orleans.Serialization.WireProtocol;

namespace Orleans.Serialization;

/// <summary>
/// Serializer for <see cref="RepeatedField{T}"/>.
/// </summary>
/// <typeparam name="T">The element type.</typeparam>
[RegisterSerializer]
public sealed class RepeatedFieldCodec<T> : IFieldCodec<RepeatedField<T>>
{
private readonly Type CodecElementType = typeof(T);

private readonly IFieldCodec<T> _fieldCodec;

/// <summary>
/// Initializes a new instance of the <see cref="RepeatedFieldCodec{T}"/> class.
/// </summary>
/// <param name="fieldCodec">The field codec.</param>
public RepeatedFieldCodec(IFieldCodec<T> fieldCodec)
{
_fieldCodec = OrleansGeneratedCodeHelper.UnwrapService(this, fieldCodec);
}

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, RepeatedField<T> value) where TBufferWriter : IBufferWriter<byte>
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

writer.WriteFieldHeader(fieldIdDelta, expectedType, value.GetType(), WireType.TagDelimited);

if (value.Count > 0)
{
UInt32Codec.WriteField(ref writer, 0, (uint)value.Count);
uint innerFieldIdDelta = 1;
foreach (var element in value)
{
_fieldCodec.WriteField(ref writer, innerFieldIdDelta, CodecElementType, element);
innerFieldIdDelta = 0;
}
}

writer.WriteEndObject();
}

/// <inheritdoc/>
public RepeatedField<T> ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.WireType == WireType.Reference)
{
return ReferenceCodec.ReadReference<RepeatedField<T>, TInput>(ref reader, field);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
RepeatedField<T> result = null;
uint fieldId = 0;
while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
var length = (int)UInt32Codec.ReadValue(ref reader, header);
if (length > 10240 && length > reader.Length)
{
ThrowInvalidSizeException(length);
}

result = new RepeatedField<T>{ Capacity = length };
ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
break;
case 1:
if (result is null)
{
ThrowLengthFieldMissing();
}

result.Add(_fieldCodec.ReadValue(ref reader, header));
break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

if (result is null)
{
result = new();
ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
}

return result;
}

private static void ThrowInvalidSizeException(int length) => throw new IndexOutOfRangeException(
$"Declared length of {typeof(RepeatedField<T>)}, {length}, is greater than total length of input.");

private static void ThrowLengthFieldMissing() => throw new RequiredFieldMissingException("Serialized RepeatedField is missing its length field.");
}
Loading

0 comments on commit 9e85ac7

Please sign in to comment.