diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs index 20c8fe7f5fda..6a76abec2097 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs @@ -64,6 +64,8 @@ public void Initialize(ExtensionConfigContext context) .AddConverter(ConvertEventDataToString) .AddConverter(ConvertBytes2EventData) .AddConverter(ConvertEventDataToBytes) + .AddConverter(ConvertBinaryDataToEventData) + .AddConverter(ConvertEventDataToBinaryData) .AddOpenConverter(ConvertPocoToEventData); // register our trigger binding provider @@ -114,5 +116,11 @@ private static Task ConvertPocoToEventData(object arg, Attribute attrRes { return Task.FromResult(ConvertStringToEventData(JsonConvert.SerializeObject(arg))); } + + private static EventData ConvertBinaryDataToEventData(BinaryData input) + => new EventData(input); + + private static BinaryData ConvertEventDataToBinaryData(EventData input) + => input.EventBody; } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 392ef1607e79..ceb2b14a822c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -24,7 +24,7 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { [NonParallelizable] [LiveOnly] - public class EventHubEndToEndTests: WebJobsEventHubTestBase + public class EventHubEndToEndTests : WebJobsEventHubTestBase { private static EventWaitHandle _eventWait; private static List _results; @@ -48,7 +48,7 @@ public async Task EventHub_PocoBinding() using (jobHost) { var method = typeof(EventHubTestBindToPocoJobs).GetMethod(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); - await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId +"' }" }); + await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId + "' }" }); bool result = _eventWait.WaitOne(Timeout); Assert.True(result); @@ -90,6 +90,26 @@ public async Task EventHub_SingleDispatch() Assert.True(result); } + AssertSingleDispatchLogs(host); + } + + [Test] + public async Task EventHub_SingleDispatch_BinaryData() + { + var (jobHost, host) = BuildHost(); + using (jobHost) + { + await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = _testId }); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + } + + AssertSingleDispatchLogs(host); + } + + private static void AssertSingleDispatchLogs(IHost host) + { IEnumerable logMessages = host.GetTestLoggerProvider() .GetAllLogMessages(); @@ -165,7 +185,7 @@ public async Task EventHub_MultipleDispatch() var (jobHost, host) = BuildHost(); using (jobHost) { - var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public); + var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), BindingFlags.Static | BindingFlags.Public); int numEvents = 5; await jobHost.CallAsync(method, new { numEvents = numEvents, input = _testId }); @@ -173,6 +193,27 @@ public async Task EventHub_MultipleDispatch() Assert.True(result); } + AssertMultipleDispatchLogs(host); + } + + [Test] + public async Task EventHub_MultipleDispatch_BinaryData() + { + var (jobHost, host) = BuildHost(); + using (jobHost) + { + int numEvents = 5; + await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = _testId }); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + } + + AssertMultipleDispatchLogs(host); + } + + private static void AssertMultipleDispatchLogs(IHost host) + { IEnumerable logMessages = host.GetTestLoggerProvider() .GetAllLogMessages(); @@ -235,6 +276,26 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, } } + public class EventHubTestSingleDispatchJobsBinaryData + { + public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out BinaryData evt) + { + evt = new BinaryData(input); + } + + public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] BinaryData evt, + string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, + IDictionary systemProperties) + { + // filter for the ID the current test is using + if (evt.ToString() == _testId) + { + Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); + _eventWait.Set(); + } + } + } + public class EventHubTestBindToPocoJobs { public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt) @@ -312,6 +373,39 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[] } } + public class EventHubTestMultipleDispatchJobsBinaryData + { + private static int s_eventCount; + private static int s_processedEventCount; + public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out BinaryData[] events) + { + s_eventCount = numEvents; + events = new BinaryData[numEvents]; + for (int i = 0; i < numEvents; i++) + { + events[i] = new BinaryData(input); + } + } + + public static void ProcessMultipleEventsBinaryData([EventHubTrigger(TestHubName)] BinaryData[] events, + string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary[] propertiesArray, + IDictionary[] systemPropertiesArray) + { + Assert.AreEqual(events.Length, partitionKeyArray.Length); + Assert.AreEqual(events.Length, enqueuedTimeUtcArray.Length); + Assert.AreEqual(events.Length, propertiesArray.Length); + Assert.AreEqual(events.Length, systemPropertiesArray.Length); + + s_processedEventCount += events.Length; + + // filter for the ID the current test is using + if (events[0].ToString() == _testId && s_processedEventCount == s_eventCount) + { + _eventWait.Set(); + } + } + } + public class EventHubPartitionKeyTestJobs { // send more events per partition than the EventHubsOptions.MaxBatchSize