-
Notifications
You must be signed in to change notification settings - Fork 557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using custom schema registry for Kafka component requires custom hostname #3910
Comments
@g7ed6e can you take a look? |
The broker container needs to know which name clients use to reach it in order to add this name to its KAFKA_ADVERTISED_LISTENERS env var. I'm currently having the same issue trying to integrate a KafkaUI container to the Aspire.Hosting.Kafka project.
|
This completely nerd sniped me. I went down the kafka rabbit hole https://www.confluent.io/blog/kafka-listeners-explained/. I recommend reading https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/networking-overview to get a baseline for understanding networking in aspire. What we haven't documented is how endpoint references work between aspire resources using C# object references. e.g.: // Add the kafka broker
var broker = builder.AddContainer("broker", "confluent-local", "7.6.0")
.WithImageRegistry("confluentinc")
.WithEndpoint(name: "primary", targetPort: 9092)
.WithEndpoint(targetPort: 9101);
// Add the schema registry
var schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry", "7.6.1")
.WithImageRegistry("confluentinc")
.WithHttpEndpoint(name: "primary", targetPort: 8081);
broker.WithEnvironment(context =>
{
var selfEndpoint = broker.GetEndpoint("primary");
// Set the advertised listeners to the public port
var advertisedListeners = builder.ExecutionContext.IsRunMode
// This is a workaround for https://github.com/dotnet/aspire/issues/3735
? ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)}")
: ReferenceExpression.Create($"PLAINTEXT://{selfEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{selfEndpoint.Property(EndpointProperty.Host)}:{selfEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
// Set the URL to the schema registry
context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");
});
schemaRegistry.WithEnvironment(context =>
{
// Get the broker endpoint
var brokerEndpoint = broker.GetEndpoint("primary");
// Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
var brokerConnectionReference = builder.ExecutionContext.IsRunMode
// This is a workaround for https://github.com/dotnet/aspire/issues/3735
? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
: ReferenceExpression.Create($"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
}); @g7ed6e We should update the advertised listeners setting in our Kafka resource to support multiple host names so that containers and host processes can access the broker server. I am hoping that will just work 😄 . |
@davidfowl Yes. Ideally I think we should use the container name as this should enable cluster with multiple broker nodes scenario too. |
docker networks aren't supported #850. |
Thank you for your answers! Im glad it sparked an interest :) I have looked at the networking guide for Aspire previously, and I graced through the kafka-listeners link and I could not come up with a working solution. |
I updated #3882 with support for both project connection (one producer and one consumer) and container connection (a KafkaUI container) using two different endpoints declared in KafkaServerResource. |
I'm just catching up on this. Does this mean we are going to need a custom |
For the time being yes. Both Kafka and Schema registry containers need to know each other. |
Hey, I found a solution which works (some configuration enhancement to what @davidfowl did already provide) :) Give this a shot: // Add the kafka broker
IResourceBuilder<ContainerResource> broker = builder.AddContainer("broker", "confluent-local")
.WithImageRegistry("confluentinc")
.WithEndpoint(name: "internal", targetPort: 9092) // Endpoint for docker-internal communication (schemaregistry)
.WithEndpoint(name: "primary", targetPort: 9093) // Endpoint for docker-external communication (applications running on localhost)
.WithHttpEndpoint(name: "rest-api", targetPort: 8082) // Confluent REST-API
.WithEndpoint(targetPort: 9101);
// Add the schema registry
IResourceBuilder<ContainerResource> schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry")
.WithImageRegistry("confluentinc")
.WithEndpoint(name: "primary", targetPort: 8081);
broker.WithEnvironment(context =>
{
var internalEndpoint = broker.GetEndpoint("internal");
var primaryEndpoint = broker.GetEndpoint("primary");
// Set the advertised listeners to the public port
var advertisedListeners = builder.ExecutionContext.IsRunMode
// This is a workaround for https://github.com/dotnet/aspire/issues/3735
? ReferenceExpression.Create(
$"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{internalEndpoint.ContainerHost}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}")
: ReferenceExpression.Create(
$"PLAINTEXT://{internalEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{internalEndpoint.Property(EndpointProperty.Host)}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
context.EnvironmentVariables["KAFKA_LISTENERS"] =
"PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_LOCALHOST://0.0.0.0:9093";
context.EnvironmentVariables["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] =
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT";
// Set the URL to the schema registry
context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");
});
schemaRegistry.WithEnvironment(context =>
{
// Get the broker endpoint
var brokerEndpoint = broker.GetEndpoint("internal");
// Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
var brokerConnectionReference = builder.ExecutionContext.IsRunMode
// This is a workaround for https://github.com/dotnet/aspire/issues/3735
? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
: ReferenceExpression.Create(
$"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
}); This article helped me a lot to understand what's going on :) |
We're making a change to enable a default docker network so container to container communications work. I'm not 100% sure if it solves this problem with the schema registry, but once the change is in, it will be worth trying again at this issue. |
@g7ed6e When you get a chance, can you see if you can make this work with the new changes in 9.0 to support a default container network? |
Tbh I don't fully understand the conditional ReferenceExpression statements so I simplified it and defined ports upfront. var publicPlainTextHostPort = FindFreePort();
var publicPlainTextInternalPort = FindFreePort();
var publicSchemaRegistryPort = FindFreePort();
var publicRestApiPort = FindFreePort();
var publicKafkaUIPort = FindFreePort();
var publicAKHQPort = FindFreePort();
var kafka = builder.AddContainer("kafka", "confluent-local", "7.7.1")
.WithImageRegistry("confluentinc")
.WithEndpoint(name: "primary", targetPort: 9092, port: publicPlainTextHostPort)
.WithEndpoint(name: "internal", targetPort: 9093, port: publicPlainTextInternalPort)
.WithHttpEndpoint(name: "restapi", targetPort: 8082, port: publicRestApiPort)
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}".ToString())
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT")
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093");
builder.AddContainer("schema-registry", "cp-schema-registry", "7.7.1")
.WithImageRegistry("confluentinc")
.WithHttpEndpoint(targetPort: 8081, port: publicSchemaRegistryPort)
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081");
builder.AddContainer("kafka-ui", "kafka-ui", "v0.7.2")
.WithImageRegistry("provectuslabs")
.WithHttpEndpoint(targetPort: 8080, port: publicKafkaUIPort)
.WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
.WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka")
.WithEnvironment("LOGGING_LEVEL_ROOT", "debug");
var akhqConfiguration = $@"akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: ""host.docker.internal:{publicPlainTextInternalPort}""
schema-registry:
url: ""http://host.docker.internal:{publicSchemaRegistryPort}""";
builder.AddContainer("akhq", "akhq", "dev")
.WithImageRegistry("tchiotludo")
.WithHttpEndpoint(targetPort: 8080, port: publicAKHQPort)
.WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration);
static int FindFreePort()
{
TcpListener? l = null;
try
{
l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
return ((IPEndPoint)l.LocalEndpoint).Port;
}
catch (Exception)
{
// ignore
}
finally
{
l?.Stop();
l?.Dispose();
}
throw new InvalidOperationException("Unable to find free port");
} I use a very similar setup for integration tests with See: Testcontainers sample public async Task StartAsync(string sessionName, CancellationToken cancellationToken, bool withAKHQ = false, bool withKafkaUI = false)
{
var publicPlainTextHostPort = FindFreePort();
var publicPlainTextInternalPort = FindFreePort();
var kafkaContainer = new ContainerBuilder()
.WithImage("confluentinc/confluent-local:7.7.1")
.WithName($"confluent-local-{sessionName}")
.WithPortBinding(publicPlainTextHostPort, 9092)
.WithPortBinding(publicPlainTextInternalPort, 9093)
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT")
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093")
.Build();
_containers.Add(kafkaContainer);
await kafkaContainer.StartAsync(cancellationToken);
KafkaConnectionString = new UriBuilder("PLAINTEXT", kafkaContainer.Hostname, kafkaContainer.GetMappedPublicPort(9092)).ToString();
var schemaRegistryContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:7.7.1")
.WithName($"cp-schema-registry-{sessionName}")
.DependsOn(kafkaContainer)
.WithPortBinding(8081, true)
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}")
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081")
.Build();
_containers.Add(schemaRegistryContainer);
await schemaRegistryContainer.StartAsync(cancellationToken);
if (withAKHQ)
{
var akhqConfiguration = $@"akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: ""host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}""
schema-registry:
url: ""http://host.docker.internal:{schemaRegistryContainer.GetMappedPublicPort(8081)}""";
var akhqContainer = new ContainerBuilder()
.WithImage("tchiotludo/akhq:dev")
.WithName($"akhq-{sessionName}")
.WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration)
.WithPortBinding(8080, true)
.Build();
_containers.Add(akhqContainer);
await akhqContainer.StartAsync(cancellationToken);
}
if (withKafkaUI)
{
var kafkaUiContainer = new ContainerBuilder()
.WithImage("provectuslabs/kafka-ui:v0.7.2")
.WithName($"kafka-ui-{sessionName}")
.WithPortBinding(8080, true)
.WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}")
.WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka")
.WithEnvironment("LOGGING_LEVEL_ROOT", "debug")
.Build();
_containers.Add(kafkaUiContainer);
await kafkaUiContainer.StartAsync(cancellationToken);
}
} |
A couple of thoughts:
|
The version from @t03apt worked fine until I was running on our gated checkins in our azure pipelines.
|
@g7ed6e - can you verify if this is still an issue? Or can this be closed? |
Visual studio enterprise: Version 17.10.0 Preview 4.0
Running Aspire version: 8.0.0-preview.6.24178.5
Trying to add a schema registry using a new resource according to this docker compose:
The only way I can get the schema registry to find the kafka server (also created a stackoverflow ticket about this) is to use the hostname of the broker in the schema registry variable "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" along with the variable
"KAFKA_ADVERTISED_LISTENERS" = "PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092" (in the kafka component this is set to localhost not broker).
Please advice. Getting the schema registry up and running would be a major piece in our puzzle.
The text was updated successfully, but these errors were encountered: