From c0230054d98253676402c53bca1db6a91dd020d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Feri=C4=87?= Date: Tue, 16 May 2023 13:20:46 +0200 Subject: [PATCH] Initial commit --- .gitignore | 258 ++++++++++++++++++ .../KafkaCloudEventsConsumer.csproj | 19 ++ KafkaCloudEventsConsumer/Program.cs | 169 ++++++++++++ .../KafkaCloudEventsProducer.csproj | 19 ++ KafkaCloudEventsProducer/Program.cs | 102 +++++++ KafkaDtos/KafkaDtos.csproj | 9 + KafkaDtos/User.cs | 7 + .../KafkaJsonEventsConsumer.csproj | 19 ++ KafkaJsonEventsConsumer/Program.cs | 162 +++++++++++ .../KafkaJsonEventsProducer.csproj | 19 ++ KafkaJsonEventsProducer/Program.cs | 85 ++++++ KafkaPlayground.sln | 49 ++++ docker-compose.yml | 47 ++++ 13 files changed, 964 insertions(+) create mode 100644 .gitignore create mode 100644 KafkaCloudEventsConsumer/KafkaCloudEventsConsumer.csproj create mode 100644 KafkaCloudEventsConsumer/Program.cs create mode 100644 KafkaCloudEventsProducer/KafkaCloudEventsProducer.csproj create mode 100644 KafkaCloudEventsProducer/Program.cs create mode 100644 KafkaDtos/KafkaDtos.csproj create mode 100644 KafkaDtos/User.cs create mode 100644 KafkaJsonEventsConsumer/KafkaJsonEventsConsumer.csproj create mode 100644 KafkaJsonEventsConsumer/Program.cs create mode 100644 KafkaJsonEventsProducer/KafkaJsonEventsProducer.csproj create mode 100644 KafkaJsonEventsProducer/Program.cs create mode 100644 KafkaPlayground.sln create mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6257 --- /dev/null +++ b/.gitignore @@ -0,0 +1,258 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +build/ +bld/ +[Bb]in/ +[Oo]bj/ + +# Visual Studio 2015 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Rider directory +.idea + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# DNX +project.lock.json +artifacts/ + +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile + +# Visual Studio macOS +..\\packages + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* +*.ncrunchsolution.user + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# TODO: Comment the next line if you want to checkin your web deploy settings +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/packages/* +# except build/, which is used as an MSBuild target. +!**/packages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/packages/repositories.config + +# Windows Azure Build Output +csx/ +*.build.csdef + +# Windows Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directory +AppPackages/ +BundleArtifacts/ + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +[Ss]tyle[Cc]op.* +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.pfx +*.publishsettings +node_modules/ +orleans.codegen.cs + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +*.mdf +*.ldf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe + +# FAKE - F# Make +.fake/ +/Orions.Sync/Orions.Sync.args.json +/Fantasy.Sync/Fantasy.Sync.args.json +/Desi.Sync/Desi.Sync.args.json +/Basketball.Sync/Basketball.Sync.args.json +/Baseball.Sync/Baseball.Sync.args.json +/WebPublish +/Output + +# Allow test .pfx file +!synergy-app-dev.pfx +/Synergy.Auth/web.config +/Synergy.Api.Editor.External/web.config + +# Healthcheck files +**/unhealthy + +# Nuke +.output/ +.DS_Store diff --git a/KafkaCloudEventsConsumer/KafkaCloudEventsConsumer.csproj b/KafkaCloudEventsConsumer/KafkaCloudEventsConsumer.csproj new file mode 100644 index 0000000..99c4753 --- /dev/null +++ b/KafkaCloudEventsConsumer/KafkaCloudEventsConsumer.csproj @@ -0,0 +1,19 @@ + + + + Exe + net6.0 + + + + + + + + + + + + + + diff --git a/KafkaCloudEventsConsumer/Program.cs b/KafkaCloudEventsConsumer/Program.cs new file mode 100644 index 0000000..dd782e1 --- /dev/null +++ b/KafkaCloudEventsConsumer/Program.cs @@ -0,0 +1,169 @@ +using CloudNative.CloudEvents.Kafka; +using CloudNative.CloudEvents.SystemTextJson; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using KafkaDtos; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaCloudEventsConsumer; + +public class Program +{ + private const string Topic = "users"; + private const string BootstrapServers = "localhost:9092,localhost:9093"; + + public static async Task Main() + { + await CreateKafkaTopic(); + + var cts = new CancellationTokenSource(); + var formatter = new JsonEventFormatter(SerializationOptions, new JsonDocumentOptions()); + + var consumer = Task.Run(() => StartConsumer(formatter, cts.Token)); + + Console.ReadKey(); + cts.Cancel(); + await consumer; + } + + private async static Task StartConsumer(JsonEventFormatter formatter, CancellationToken ct) + { + var consumerConfig = new ConsumerConfig + { + BootstrapServers = BootstrapServers, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, + GroupId = "cgid", + EnableAutoCommit = true, + EnableAutoOffsetStore = false + }; + List> results = new(); + List lostPartitions = new(); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsRevokedHandler((consumer, topicPartitions) => + { + Console.WriteLine($"Partitions revoked! Consumer {consumer.Name}: {string.Join(", ", consumer.Assignment.Select(a => $"({a.Topic}, {a.Partition})"))}\r\nTopicPartitions: {string.Join(", ", topicPartitions.Select(a => $"({a.Topic}, {a.Partition})"))}"); + + try + { + foreach (var topicPartition in consumer.Assignment) + { + if (topicPartitions.Any(tp => topicPartition.Topic == tp.Topic && topicPartition.Partition == tp.Partition)) + { + lostPartitions.Add(topicPartition); + Console.WriteLine($"Committing ({topicPartition.Topic}, {topicPartition.Partition})"); + var result = results.LastOrDefault(r => r.Topic == topicPartition.Topic && r.Partition == topicPartition.Partition); + if (result != null) + { + consumer.StoreOffset(result); + Console.WriteLine("Committed"); + } + else + { + Console.WriteLine("Nothing to commit"); + } + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Exception when committing:\n{ex}"); + } + }) + .Build(); + consumer.Subscribe(new[] { Topic }); + + int batchNumber = 0; + + while (!ct.IsCancellationRequested) + { + try + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (consumeResult is null) continue; + + if (batchNumber == 0) + { + results = new() { consumeResult }; + lostPartitions = new(); + } + else + { + results.Add(consumeResult); + } + + batchNumber = (batchNumber + 1) % 10; + + var consumedMessage = consumeResult.Message; + var cloudEventMessage = consumedMessage.ToCloudEvent(formatter); + var data = (User)cloudEventMessage.Data; + + var partition = consumeResult.Partition; + var key = consumeResult.Message.Key; + var offset = consumeResult.Offset; + var dataJson = JsonSerializer.Serialize(data, SerializationOptions); + + Console.WriteLine($"Partition: {partition} Key: {key} Offset: {offset} Data: {dataJson}"); + + if (batchNumber == 0) + { + var filteredResults = results.Where(r => !lostPartitions.Any(tp => r.Topic == tp.Topic && r.Partition == tp.Partition)).ToList(); + + var resultsWithLatestOffsetPerPartition = filteredResults.GroupBy(x => x.Partition.Value) + .Select(group => group.MaxBy(y => y.Offset.Value)) + .Where(x => x != null); + foreach (var result in resultsWithLatestOffsetPerPartition) + { + Console.WriteLine($"Storing offset for topic {result.Topic}, partition {result.Partition}"); + consumer.StoreOffset(result); + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Exception when consuming:\n{ex}"); + } + } + } + + private static JsonSerializerOptions SerializationOptions => new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private static async Task CreateKafkaTopic() + { + var config = new AdminClientConfig + { + BootstrapServers = BootstrapServers + }; + + var builder = new AdminClientBuilder(config); + var client = builder.Build(); + try + { + await client.CreateTopicsAsync(new List + { + new() + { + Name = Topic, + ReplicationFactor = 1, + NumPartitions = 2 + } + }); + } + catch (CreateTopicsException e) + { + // do nothing in case of topic already exist + } + finally + { + client.Dispose(); + } + } +} \ No newline at end of file diff --git a/KafkaCloudEventsProducer/KafkaCloudEventsProducer.csproj b/KafkaCloudEventsProducer/KafkaCloudEventsProducer.csproj new file mode 100644 index 0000000..29e4981 --- /dev/null +++ b/KafkaCloudEventsProducer/KafkaCloudEventsProducer.csproj @@ -0,0 +1,19 @@ + + + + Exe + net6.0 + + + + + + + + + + + + + + diff --git a/KafkaCloudEventsProducer/Program.cs b/KafkaCloudEventsProducer/Program.cs new file mode 100644 index 0000000..9c6dcfb --- /dev/null +++ b/KafkaCloudEventsProducer/Program.cs @@ -0,0 +1,102 @@ +using CloudNative.CloudEvents; +using CloudNative.CloudEvents.Extensions; +using CloudNative.CloudEvents.Kafka; +using CloudNative.CloudEvents.SystemTextJson; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using KafkaDtos; +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaCloudEventsProducer; + +public class Program +{ + private const string Topic = "users"; + private const string BootstrapServers = "localhost:9092,localhost:9093"; + + public static async Task Main() + { + await CreateKafkaTopic(); + + var cts = new CancellationTokenSource(); + var formatter = new JsonEventFormatter(SerializationOptions, new JsonDocumentOptions()); + + var producer = Task.Run(() => StartProducer(formatter, cts.Token)); + + Console.ReadKey(); + cts.Cancel(); + await producer; + } + + private static async Task StartProducer(JsonEventFormatter formatter, CancellationToken ct) + { + var producerConfig = new ProducerConfig { BootstrapServers = BootstrapServers }; + using var producer = new ProducerBuilder(producerConfig).Build(); + + var i = 1; + while (!ct.IsCancellationRequested) + { + var userId = $"UserId_{i}"; + var cloudEvent = new CloudEvent + { + Id = Guid.NewGuid().ToString(), + Type = "event-type", + Source = new Uri("https://cloudevents.io/"), + Time = DateTimeOffset.UtcNow, + DataContentType = "application/cloudevents+json", + Data = new User + { + UserId = userId, + Name = $"Name_{i}", + } + }; + cloudEvent.SetPartitionKey(userId); + var kafkaMessage = cloudEvent.ToKafkaMessage(ContentMode.Structured, formatter); + await producer.ProduceAsync(Topic, kafkaMessage); + + i++; + + await Task.Delay(TimeSpan.FromSeconds(1)); + } + } + + private static JsonSerializerOptions SerializationOptions => new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private static async Task CreateKafkaTopic() + { + var config = new AdminClientConfig + { + BootstrapServers = BootstrapServers + }; + + var builder = new AdminClientBuilder(config); + var client = builder.Build(); + try + { + await client.CreateTopicsAsync(new List + { + new() + { + Name = Topic, + ReplicationFactor = 1, + NumPartitions = 2 + } + }); + } + catch (CreateTopicsException e) + { + // do nothing in case of topic already exist + } + finally + { + client.Dispose(); + } + } +} \ No newline at end of file diff --git a/KafkaDtos/KafkaDtos.csproj b/KafkaDtos/KafkaDtos.csproj new file mode 100644 index 0000000..132c02c --- /dev/null +++ b/KafkaDtos/KafkaDtos.csproj @@ -0,0 +1,9 @@ + + + + net6.0 + enable + enable + + + diff --git a/KafkaDtos/User.cs b/KafkaDtos/User.cs new file mode 100644 index 0000000..cb9c897 --- /dev/null +++ b/KafkaDtos/User.cs @@ -0,0 +1,7 @@ +namespace KafkaDtos; + +public class User +{ + public string UserId { get; set; } + public string Name { get; set; } +} \ No newline at end of file diff --git a/KafkaJsonEventsConsumer/KafkaJsonEventsConsumer.csproj b/KafkaJsonEventsConsumer/KafkaJsonEventsConsumer.csproj new file mode 100644 index 0000000..99c4753 --- /dev/null +++ b/KafkaJsonEventsConsumer/KafkaJsonEventsConsumer.csproj @@ -0,0 +1,19 @@ + + + + Exe + net6.0 + + + + + + + + + + + + + + diff --git a/KafkaJsonEventsConsumer/Program.cs b/KafkaJsonEventsConsumer/Program.cs new file mode 100644 index 0000000..7789caf --- /dev/null +++ b/KafkaJsonEventsConsumer/Program.cs @@ -0,0 +1,162 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaJsonEventsConsumer; + +public class Program +{ + private const string Topic = "users_json"; + private const string BootstrapServers = "localhost:9092,localhost:9093"; + + public static async Task Main() + { + await CreateKafkaTopic(); + + var cts = new CancellationTokenSource(); + + var consumer = Task.Run(() => StartConsumer(cts.Token)); + + Console.ReadKey(); + cts.Cancel(); + await consumer; + } + + private static void StartConsumer(CancellationToken ct) + { + var consumerConfig = new ConsumerConfig + { + BootstrapServers = BootstrapServers, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, + GroupId = "cgid", + EnableAutoCommit = true, + EnableAutoOffsetStore = false + }; + List> results = new(); + List lostPartitions = new(); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsRevokedHandler((consumer, topicPartitions) => + { + Console.WriteLine($"Partitions revoked! Consumer {consumer.Name}: {string.Join(", ", consumer.Assignment.Select(a => $"({a.Topic}, {a.Partition})"))}\r\nTopicPartitions: {string.Join(", ", topicPartitions.Select(a => $"({a.Topic}, {a.Partition})"))}"); + + try + { + foreach (var topicPartition in consumer.Assignment) + { + if (topicPartitions.Any(tp => topicPartition.Topic == tp.Topic && topicPartition.Partition == tp.Partition)) + { + lostPartitions.Add(topicPartition); + Console.WriteLine($"Committing ({topicPartition.Topic}, {topicPartition.Partition})"); + var result = results.LastOrDefault(r => r.Topic == topicPartition.Topic && r.Partition == topicPartition.Partition); + if (result != null) + { + consumer.StoreOffset(result); + Console.WriteLine("Committed"); + } + else + { + Console.WriteLine("Nothing to commit"); + } + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Exception when committing:\n{ex}"); + } + }) + .Build(); + consumer.Subscribe(new[] { Topic }); + + int batchNumber = 0; + + while (!ct.IsCancellationRequested) + { + try + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (consumeResult is null) continue; + + if (batchNumber == 0) + { + results = new() { consumeResult }; + lostPartitions = new(); + } + else + { + results.Add(consumeResult); + } + + batchNumber = (batchNumber + 1) % 10; + + var consumedMessage = consumeResult.Message; + + var partition = consumeResult.Partition; + var key = consumeResult.Message.Key; + var offset = consumeResult.Offset; + + Console.WriteLine($"Partition: {partition} Key: {key} Offset: {offset} Data: {consumedMessage.Value}"); + + if (batchNumber == 0) + { + var filteredResults = results.Where(r => !lostPartitions.Any(tp => r.Topic == tp.Topic && r.Partition == tp.Partition)).ToList(); + + var resultsWithLatestOffsetPerPartition = filteredResults.GroupBy(x => x.Partition.Value) + .Select(group => group.MaxBy(y => y.Offset.Value)) + .Where(x => x != null); + foreach (var result in resultsWithLatestOffsetPerPartition) + { + Console.WriteLine($"Storing offset for topic {result.Topic}, partition {result.Partition}"); + consumer.StoreOffset(result); + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Exception when consuming:\n{ex}"); + } + } + } + + private static JsonSerializerOptions SerializationOptions => new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private static async Task CreateKafkaTopic() + { + var config = new AdminClientConfig + { + BootstrapServers = BootstrapServers + }; + + var builder = new AdminClientBuilder(config); + var client = builder.Build(); + try + { + await client.CreateTopicsAsync(new List + { + new() + { + Name = Topic, + ReplicationFactor = 1, + NumPartitions = 2 + } + }); + } + catch (CreateTopicsException e) + { + // do nothing in case of topic already exist + } + finally + { + client.Dispose(); + } + } +} \ No newline at end of file diff --git a/KafkaJsonEventsProducer/KafkaJsonEventsProducer.csproj b/KafkaJsonEventsProducer/KafkaJsonEventsProducer.csproj new file mode 100644 index 0000000..29e4981 --- /dev/null +++ b/KafkaJsonEventsProducer/KafkaJsonEventsProducer.csproj @@ -0,0 +1,19 @@ + + + + Exe + net6.0 + + + + + + + + + + + + + + diff --git a/KafkaJsonEventsProducer/Program.cs b/KafkaJsonEventsProducer/Program.cs new file mode 100644 index 0000000..1980da2 --- /dev/null +++ b/KafkaJsonEventsProducer/Program.cs @@ -0,0 +1,85 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using KafkaDtos; +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaJsonEventsProducer; + +public class Program +{ + private const string Topic = "users_json"; + private const string BootstrapServers = "localhost:9092,localhost:9093"; + + public static async Task Main() + { + await CreateKafkaTopic(); + + var cts = new CancellationTokenSource(); + + var producer = Task.Run(() => StartProducer(cts.Token)); + + Console.ReadKey(); + cts.Cancel(); + await producer; + } + + private static async Task StartProducer(CancellationToken ct) + { + var producerConfig = new ProducerConfig { BootstrapServers = BootstrapServers }; + using var producer = new ProducerBuilder(producerConfig).Build(); + + var i = 1; + while (!ct.IsCancellationRequested) + { + var userId = $"UserId_{i}"; + var message = new Message + { + Value = JsonSerializer.Serialize(new User + { + UserId = userId, + Name = $"Name_{i}", + }) + }; + await producer.ProduceAsync(Topic, message); + + i++; + + await Task.Delay(TimeSpan.FromSeconds(1)); + } + } + + private static async Task CreateKafkaTopic() + { + var config = new AdminClientConfig + { + BootstrapServers = BootstrapServers + }; + + var builder = new AdminClientBuilder(config); + var client = builder.Build(); + try + { + await client.CreateTopicsAsync(new List + { + new() + { + Name = Topic, + ReplicationFactor = 1, + NumPartitions = 2 + } + }); + } + catch (CreateTopicsException e) + { + // do nothing in case of topic already exist + } + finally + { + client.Dispose(); + } + } +} \ No newline at end of file diff --git a/KafkaPlayground.sln b/KafkaPlayground.sln new file mode 100644 index 0000000..60cfb30 --- /dev/null +++ b/KafkaPlayground.sln @@ -0,0 +1,49 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.33627.172 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaCloudEventsProducer", "KafkaCloudEventsProducer\KafkaCloudEventsProducer.csproj", "{3DBD85F9-DE23-4943-9F9C-5002179E43BD}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaCloudEventsConsumer", "KafkaCloudEventsConsumer\KafkaCloudEventsConsumer.csproj", "{1DB86CB3-6295-4766-928D-0F67FEE01E8D}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaDtos", "KafkaDtos\KafkaDtos.csproj", "{39A82434-F5F0-43AE-8CC9-556045FABED5}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaJsonEventsConsumer", "KafkaJsonEventsConsumer\KafkaJsonEventsConsumer.csproj", "{F3D32C51-F05A-4D06-A12A-1A186EB81E48}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaJsonEventsProducer", "KafkaJsonEventsProducer\KafkaJsonEventsProducer.csproj", "{3337C5BE-1056-4535-9AE0-63613DC90E94}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {3DBD85F9-DE23-4943-9F9C-5002179E43BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3DBD85F9-DE23-4943-9F9C-5002179E43BD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3DBD85F9-DE23-4943-9F9C-5002179E43BD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3DBD85F9-DE23-4943-9F9C-5002179E43BD}.Release|Any CPU.Build.0 = Release|Any CPU + {1DB86CB3-6295-4766-928D-0F67FEE01E8D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1DB86CB3-6295-4766-928D-0F67FEE01E8D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1DB86CB3-6295-4766-928D-0F67FEE01E8D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1DB86CB3-6295-4766-928D-0F67FEE01E8D}.Release|Any CPU.Build.0 = Release|Any CPU + {39A82434-F5F0-43AE-8CC9-556045FABED5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {39A82434-F5F0-43AE-8CC9-556045FABED5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {39A82434-F5F0-43AE-8CC9-556045FABED5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {39A82434-F5F0-43AE-8CC9-556045FABED5}.Release|Any CPU.Build.0 = Release|Any CPU + {F3D32C51-F05A-4D06-A12A-1A186EB81E48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F3D32C51-F05A-4D06-A12A-1A186EB81E48}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F3D32C51-F05A-4D06-A12A-1A186EB81E48}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F3D32C51-F05A-4D06-A12A-1A186EB81E48}.Release|Any CPU.Build.0 = Release|Any CPU + {3337C5BE-1056-4535-9AE0-63613DC90E94}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3337C5BE-1056-4535-9AE0-63613DC90E94}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3337C5BE-1056-4535-9AE0-63613DC90E94}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3337C5BE-1056-4535-9AE0-63613DC90E94}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {384EA1B5-E646-45D5-BA54-26CD98576C23} + EndGlobalSection +EndGlobal diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..569ee5e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,47 @@ +--- +version: '3.8' +services: + zookeeper-1: + image: confluentinc/cp-zookeeper:6.1.9 + ports: + - '32181:32181' + restart: always + environment: + ZOOKEEPER_CLIENT_PORT: 32181 + ZOOKEEPER_TICK_TIME: 2000 + + + kafka-1: + image: confluentinc/cp-kafka:6.1.9 + ports: + - '9092:9092' + restart: always + depends_on: + - zookeeper-1 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 + KAFKA_DEFAULT_REPLICATION_FACTOR: 2 + KAFKA_NUM_PARTITIONS: 2 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + + + kafka-2: + image: confluentinc/cp-kafka:6.1.9 + ports: + - '9093:9093' + restart: always + depends_on: + - zookeeper-1 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093 + KAFKA_DEFAULT_REPLICATION_FACTOR: 2 + KAFKA_NUM_PARTITIONS: 2 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 \ No newline at end of file