Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Create topic with additional configuration if present
Browse files Browse the repository at this point in the history
robertcoltheart committed Nov 6, 2024

Unverified

This user has not yet uploaded their public signing key.
1 parent e121a5b commit fb3b005
Showing 4 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -77,9 +77,11 @@ public interface IClusterConfigurationBuilder
/// <param name="topicName">The topic name</param>
/// <param name="numberOfPartitions">The number of Topic partitions. Default is to use the cluster-defined partitions.</param>
/// <param name="replicationFactor">The Topic replication factor. Default is to use the cluster-defined replication factor.</param>
/// <param name="configs">Additional topic creation configuration values.</param>
/// <returns></returns>
IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1);
short replicationFactor = -1,
Dictionary<string, string> configs = null);
}
1 change: 1 addition & 0 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
@@ -122,6 +122,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
Name = topicConfiguration.Name,
ReplicationFactor = topicConfiguration.Replicas,
NumPartitions = topicConfiguration.Partitions,
Configs = topicConfiguration.Configs,
})
.ToArray();

5 changes: 3 additions & 2 deletions src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -111,9 +111,10 @@ public IClusterConfigurationBuilder OnStarted(Action<IDependencyResolver> handle
public IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1)
short replicationFactor = -1,
Dictionary<string, string> configs = null)
{
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor));
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs));
return this;
}
}
11 changes: 10 additions & 1 deletion src/KafkaFlow/Configuration/TopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;

namespace KafkaFlow.Configuration;

/// <summary>
@@ -11,11 +13,13 @@ public class TopicConfiguration
/// <param name="name">The topic name</param>
/// <param name="partitions">The number of partitions for the topic</param>
/// <param name="replicas">Replication factor for the topic</param>
public TopicConfiguration(string name, int partitions, short replicas)
/// <param name="configs">Additional topic creation configuration values.</param>
public TopicConfiguration(string name, int partitions, short replicas, Dictionary<string, string> configs)
{
this.Name = name;
this.Partitions = partitions;
this.Replicas = replicas;
this.Configs = configs;
}

/// <summary>
@@ -32,4 +36,9 @@ public TopicConfiguration(string name, int partitions, short replicas)
/// Gets the Topic Replication Factor
/// </summary>
public short Replicas { get; }

/// <summary>
/// Gets the topic creation configuration
/// </summary>
public Dictionary<string, string> Configs { get; }
}

0 comments on commit fb3b005

Please sign in to comment.