Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using custom schema registry for Kafka component requires custom hostname #3910

Open
ErikBIMobject opened this issue Apr 24, 2024 · 16 comments
Labels
area-integrations Issues pertaining to Aspire Integrations packages kafka Issues related to Kafka integrations
Milestone

Comments

@ErikBIMobject
Copy link

ErikBIMobject commented Apr 24, 2024

Visual studio enterprise: Version 17.10.0 Preview 4.0
Running Aspire version: 8.0.0-preview.6.24178.5

Trying to add a schema registry using a new resource according to this docker compose:

`version: '3.8'
services:

  broker:
    image: confluentinc/confluent-local:7.6.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
      KAFKA_DEBUG: "true"
      KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:1223

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

The only way I can get the schema registry to find the kafka server (also created a stackoverflow ticket about this) is to use the hostname of the broker in the schema registry variable "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" along with the variable
"KAFKA_ADVERTISED_LISTENERS" = "PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092" (in the kafka component this is set to localhost not broker).
Please advice. Getting the schema registry up and running would be a major piece in our puzzle.

@dotnet-issue-labeler dotnet-issue-labeler bot added the area-integrations Issues pertaining to Aspire Integrations packages label Apr 24, 2024
@davidfowl
Copy link
Member

@g7ed6e can you take a look?

@g7ed6e
Copy link
Contributor

g7ed6e commented Apr 24, 2024

The broker container needs to know which name clients use to reach it in order to add this name to its KAFKA_ADVERTISED_LISTENERS env var. I'm currently having the same issue trying to integrate a KafkaUI container to the Aspire.Hosting.Kafka project.
I need to try something similar to what Aspire.Hosting.Mongo is doing to get MongoExpress connect to MongoDb. MongoExpress solves this by using PrimaryEndpoint.ContainerHost and PrimaryEndpoint.Port to build the connection string

context.EnvironmentVariables.Add("ME_CONFIG_MONGODB_URL", $"mongodb://{resource.PrimaryEndpoint.ContainerHost}:{resource.PrimaryEndpoint.Port}/?directConnection=true");
.

@davidfowl
Copy link
Member

davidfowl commented Apr 25, 2024

This completely nerd sniped me. I went down the kafka rabbit hole https://www.confluent.io/blog/kafka-listeners-explained/.

I recommend reading https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/networking-overview to get a baseline for understanding networking in aspire.

What we haven't documented is how endpoint references work between aspire resources using C# object references. e.g.:

// Add the kafka broker
var broker = builder.AddContainer("broker", "confluent-local", "7.6.0")
                    .WithImageRegistry("confluentinc")
                    .WithEndpoint(name: "primary", targetPort: 9092)
                    .WithEndpoint(targetPort: 9101);

// Add the schema registry
var schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry", "7.6.1")
                            .WithImageRegistry("confluentinc")
                            .WithHttpEndpoint(name: "primary", targetPort: 8081);

broker.WithEnvironment(context =>
{
    var selfEndpoint = broker.GetEndpoint("primary");

    // Set the advertised listeners to the public port
    var advertisedListeners = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create($"PLAINTEXT://{selfEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{selfEndpoint.Property(EndpointProperty.Host)}:{selfEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;

    // Set the URL to the schema registry
    context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");
});

schemaRegistry.WithEnvironment(context =>
{
    // Get the broker endpoint
    var brokerEndpoint = broker.GetEndpoint("primary");

    // Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
    var brokerConnectionReference = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create($"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
    context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
});

@g7ed6e We should update the advertised listeners setting in our Kafka resource to support multiple host names so that containers and host processes can access the broker server. I am hoping that will just work 😄 .

@davidfowl davidfowl added area-app-model Issues pertaining to the APIs in Aspire.Hosting, e.g. DistributedApplication and removed area-integrations Issues pertaining to Aspire Integrations packages labels Apr 25, 2024
@g7ed6e
Copy link
Contributor

g7ed6e commented Apr 25, 2024

@davidfowl Yes. Ideally I think we should use the container name as this should enable cluster with multiple broker nodes scenario too.
I did not find in the resource properties the generated docker container name yet maybe I missed an api. An alternative to pull the generated container name would be to set it to a stable fixed value.. but cluster with multiple broker nodes scenario would probably rely on naming conventions.

@davidfowl
Copy link
Member

@davidfowl Yes. Ideally I think we should use the container name as this should enable cluster with multiple broker nodes scenario too.

docker networks aren't supported #850.

@ErikBIMobject
Copy link
Author

ErikBIMobject commented Apr 25, 2024

Thank you for your answers! Im glad it sparked an interest :)
I checked your solution @davidfowl and indeed it works, the schema registry can connect to the broker.
But the api can no longer connect to the broker 🤷‍♂️
I need to have 2 different setting for the PLAINTEXT_HOST part of the KAFKA_ADVERTISED_LISTENERS variable: this is needed for the schema registry: PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)}
and this is needed for the api: PLAINTEXT_HOST://localhost:{selfEndpoint.Property(EndpointProperty.Port)}

I have looked at the networking guide for Aspire previously, and I graced through the kafka-listeners link and I could not come up with a working solution.
I'll try to dig a little more into the listerners documentation and see if I come up with something

@g7ed6e
Copy link
Contributor

g7ed6e commented Apr 25, 2024

I updated #3882 with support for both project connection (one producer and one consumer) and container connection (a KafkaUI container) using two different endpoints declared in KafkaServerResource.

@mitchdenny
Copy link
Member

I'm just catching up on this. Does this mean we are going to need a custom WithReference for Kafka to make sure that it wires up correctly?

@g7ed6e
Copy link
Contributor

g7ed6e commented May 6, 2024

For the time being yes. Both Kafka and Schema registry containers need to know each other.
Client side schema registry has a dedicated client from which we can pull serializer and deserializer. Those serializer / deserializer are then passed to the kafka client.

@Compufreak345
Copy link

Compufreak345 commented Jul 24, 2024

Thank you for your answers! Im glad it sparked an interest :) I checked your solution @davidfowl and indeed it works, the schema registry can connect to the broker. But the api can no longer connect to the broker 🤷‍♂️ I need to have 2 different setting for the PLAINTEXT_HOST part of the KAFKA_ADVERTISED_LISTENERS variable: this is needed for the schema registry: PLAINTEXT_HOST://{selfEndpoint.ContainerHost}:{selfEndpoint.Property(EndpointProperty.Port)} and this is needed for the api: PLAINTEXT_HOST://localhost:{selfEndpoint.Property(EndpointProperty.Port)}

I have looked at the networking guide for Aspire previously, and I graced through the kafka-listeners link and I could not come up with a working solution. I'll try to dig a little more into the listerners documentation and see if I come up with something

Hey, I found a solution which works (some configuration enhancement to what @davidfowl did already provide) :) Give this a shot:

// Add the kafka broker
IResourceBuilder<ContainerResource> broker = builder.AddContainer("broker", "confluent-local")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "internal", targetPort: 9092) // Endpoint for docker-internal communication (schemaregistry)
    .WithEndpoint(name: "primary", targetPort: 9093) // Endpoint for docker-external communication (applications running on localhost)
    .WithHttpEndpoint(name: "rest-api", targetPort: 8082) // Confluent REST-API 
    .WithEndpoint(targetPort: 9101);

// Add the schema registry
IResourceBuilder<ContainerResource> schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "primary", targetPort: 8081);

broker.WithEnvironment(context =>
{
    var internalEndpoint = broker.GetEndpoint("internal");
    var primaryEndpoint = broker.GetEndpoint("primary");
    
    // Set the advertised listeners to the public port
    var advertisedListeners = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create(
            $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://{internalEndpoint.ContainerHost}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create(
            $"PLAINTEXT://{internalEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{internalEndpoint.Property(EndpointProperty.Host)}:{internalEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_LOCALHOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)}");
    
    context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
    context.EnvironmentVariables["KAFKA_LISTENERS"] =
       "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_LOCALHOST://0.0.0.0:9093";
    context.EnvironmentVariables["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] =
        "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_LOCALHOST:PLAINTEXT";
    
    // Set the URL to the schema registry
    context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint("primary");
    
});

schemaRegistry.WithEnvironment(context =>
{
    // Get the broker endpoint
    var brokerEndpoint = broker.GetEndpoint("internal");

    // Create a reference to the broker endpoint, doing it this way will ensure that it works in both run and publish mode
    var brokerConnectionReference = builder.ExecutionContext.IsRunMode
        // This is a workaround for https://github.com/dotnet/aspire/issues/3735
        ? ReferenceExpression.Create($"{brokerEndpoint.ContainerHost}:{brokerEndpoint.Property(EndpointProperty.Port)}")
        : ReferenceExpression.Create(
            $"{brokerEndpoint.Property(EndpointProperty.Host)}:{brokerEndpoint.Property(EndpointProperty.Port)}");

    context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = brokerConnectionReference;
    context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "schema-registry";
});

This article helped me a lot to understand what's going on :)

@davidfowl davidfowl added area-integrations Issues pertaining to Aspire Integrations packages and removed area-app-model Issues pertaining to the APIs in Aspire.Hosting, e.g. DistributedApplication labels Sep 7, 2024
@davidfowl
Copy link
Member

We're making a change to enable a default docker network so container to container communications work. I'm not 100% sure if it solves this problem with the schema registry, but once the change is in, it will be worth trying again at this issue.

@davidfowl davidfowl added the kafka Issues related to Kafka integrations label Sep 14, 2024
@davidfowl
Copy link
Member

davidfowl commented Oct 9, 2024

@g7ed6e When you get a chance, can you see if you can make this work with the new changes in 9.0 to support a default container network?

@t03apt
Copy link

t03apt commented Oct 9, 2024

Tbh I don't fully understand the conditional ReferenceExpression statements so I simplified it and defined ports upfront.
In case anyone interested I share it here:

var publicPlainTextHostPort = FindFreePort();
var publicPlainTextInternalPort = FindFreePort();
var publicSchemaRegistryPort = FindFreePort();
var publicRestApiPort = FindFreePort();
var publicKafkaUIPort = FindFreePort();
var publicAKHQPort = FindFreePort();

var kafka = builder.AddContainer("kafka", "confluent-local", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithEndpoint(name: "primary", targetPort: 9092, port: publicPlainTextHostPort)
    .WithEndpoint(name: "internal", targetPort: 9093, port: publicPlainTextInternalPort)
    .WithHttpEndpoint(name: "restapi", targetPort: 8082, port: publicRestApiPort)
    .WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT")
    .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093");

builder.AddContainer("schema-registry", "cp-schema-registry", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithHttpEndpoint(targetPort: 8081, port: publicSchemaRegistryPort)
    .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost")
    .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081");

builder.AddContainer("kafka-ui", "kafka-ui", "v0.7.2")
    .WithImageRegistry("provectuslabs")
    .WithHttpEndpoint(targetPort: 8080, port: publicKafkaUIPort)
    .WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{publicPlainTextInternalPort}".ToString())
    .WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka")
    .WithEnvironment("LOGGING_LEVEL_ROOT", "debug");

var akhqConfiguration = $@"akhq:
  connections:
    docker-kafka-server:
      properties:
        bootstrap.servers: ""host.docker.internal:{publicPlainTextInternalPort}""
      schema-registry:
        url: ""http://host.docker.internal:{publicSchemaRegistryPort}""";

builder.AddContainer("akhq", "akhq", "dev")
    .WithImageRegistry("tchiotludo")
    .WithHttpEndpoint(targetPort: 8080, port: publicAKHQPort)
    .WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration);

static int FindFreePort()
{
    TcpListener? l = null;
    try
    {
        l = new TcpListener(IPAddress.Loopback, 0);
        l.Start();
        return ((IPEndPoint)l.LocalEndpoint).Port;
    }
    catch (Exception)
    {
        // ignore
    }
    finally
    {
        l?.Stop();
        l?.Dispose();
    }

    throw new InvalidOperationException("Unable to find free port");
}

I use a very similar setup for integration tests with Testcontainers.

See: Testcontainers sample
    public async Task StartAsync(string sessionName, CancellationToken cancellationToken, bool withAKHQ = false, bool withKafkaUI = false)
    {
        var publicPlainTextHostPort = FindFreePort();
        var publicPlainTextInternalPort = FindFreePort();
        var kafkaContainer = new ContainerBuilder()
                .WithImage("confluentinc/confluent-local:7.7.1")
                .WithName($"confluent-local-{sessionName}")
                .WithPortBinding(publicPlainTextHostPort, 9092)
                .WithPortBinding(publicPlainTextInternalPort, 9093)
                .WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{publicPlainTextHostPort},PLAINTEXT_INTERNAL://host.docker.internal:{publicPlainTextInternalPort}")
                .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT")
                .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093")
                .Build();

        _containers.Add(kafkaContainer);
        await kafkaContainer.StartAsync(cancellationToken);

        KafkaConnectionString = new UriBuilder("PLAINTEXT", kafkaContainer.Hostname, kafkaContainer.GetMappedPublicPort(9092)).ToString();

        var schemaRegistryContainer = new ContainerBuilder()
            .WithImage("confluentinc/cp-schema-registry:7.7.1")
            .WithName($"cp-schema-registry-{sessionName}")
            .DependsOn(kafkaContainer)
            .WithPortBinding(8081, true)
            .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "localhost")
            .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}")
            .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", $"http://0.0.0.0:8081")
            .Build();

        _containers.Add(schemaRegistryContainer);
        await schemaRegistryContainer.StartAsync(cancellationToken);

        if (withAKHQ)
        {
            var akhqConfiguration = $@"akhq:
  connections:
    docker-kafka-server:
      properties:
        bootstrap.servers: ""host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}""
      schema-registry:
        url: ""http://host.docker.internal:{schemaRegistryContainer.GetMappedPublicPort(8081)}""";

            var akhqContainer = new ContainerBuilder()
                .WithImage("tchiotludo/akhq:dev")
                .WithName($"akhq-{sessionName}")
                .WithEnvironment("AKHQ_CONFIGURATION", akhqConfiguration)
                .WithPortBinding(8080, true)
                .Build();

            _containers.Add(akhqContainer);
            await akhqContainer.StartAsync(cancellationToken);
        }

        if (withKafkaUI)
        {
            var kafkaUiContainer = new ContainerBuilder()
                .WithImage("provectuslabs/kafka-ui:v0.7.2")
                .WithName($"kafka-ui-{sessionName}")
                .WithPortBinding(8080, true)
                .WithEnvironment("KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", $"host.docker.internal:{kafkaContainer.GetMappedPublicPort(9093)}")
                .WithEnvironment("KAFKA_CLUSTERS_0_NAME", "kafka")
                .WithEnvironment("LOGGING_LEVEL_ROOT", "debug")
                .Build();

            _containers.Add(kafkaUiContainer);
            await kafkaUiContainer.StartAsync(cancellationToken);
        }
    }

@davidfowl
Copy link
Member

A couple of thoughts:

  1. The reference expression exists to support capturing the expression information without using real values (it's a string with placeholders). This is important if you want it to work outside of local development because we can replace the host and port with the appropriate values in the right environment.
  2. Dynamic port allocation is done by the infrastructure if you don't specify a port, so that code isn't needed.
  3. Ideally, we wouldn't need to hardcode host.docker.internal, that's what the ContainerHost property is for.
  4. With Aspire 9, we have a default container network, so it should be possible to use container host names and the target port to do container to container communication.

@ErikBIMobject
Copy link
Author

ErikBIMobject commented Nov 28, 2024

The version from @t03apt worked fine until I was running on our gated checkins in our azure pipelines.
The windows OS machine didnt work and the linux one did not have the notion of host.docker.internal.
After some digging I found a working solution and I wanted to share it here.
Since we have the new container network in aspire 9 we can call the the respective containers inside the network using their internal ports (in contrast to the previous solutions that called host.docker.internal (which is the host)).
Here is a code snippet of how I initialize the kafka part of our orchestration. Its working great!

var internalEndpointName = "internal";
var primaryEndpointName = "primary";

var schemaRegistry = builder.AddContainer("schema-registry", "cp-schema-registry", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithContainerName("schema-registry")
    .WithHttpEndpoint(targetPort: 8081, name: primaryEndpointName);

var kafka = builder.AddContainer("kafka", "confluent-local", "7.7.1")
    .WithImageRegistry("confluentinc")
    .WithContainerName("kafka")
    .WithEndpoint(name: primaryEndpointName, targetPort: 9092)
    .WithEndpoint(name: internalEndpointName, targetPort: 9093)
    .WithHttpEndpoint(name: "restapi", targetPort: 8082);

var kafkaBrokerEndpoint = kafka.GetEndpoint(internalEndpointName);

schemaRegistry
    .WaitFor(kafka)
    .WithEnvironment(context =>
    {
        context.EnvironmentVariables["SCHEMA_REGISTRY_HOST_NAME"] = "localhost";
        context.EnvironmentVariables["SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"] = ReferenceExpression.Create($"kafka:9093");
        context.EnvironmentVariables["SCHEMA_REGISTRY_LISTENERS"] = "http://0.0.0.0:8081";
    })
    .WithLifetime(ContainerLifetime.Persistent);

kafka.WithEnvironment(context =>
{
    var primaryEndpoint = kafka.GetEndpoint(primaryEndpointName);
    var internalEndpoint = kafka.GetEndpoint(internalEndpointName);
    var advertisedListenersExpression =
        ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{kafka.GetEndpoint(primaryEndpointName).Property(EndpointProperty.Port)},PLAINTEXT_INTERNAL://kafka:9093");

    context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListenersExpression;
    context.EnvironmentVariables["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT";
    context.EnvironmentVariables["KAFKA_LISTENERS"] = "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093";
    context.EnvironmentVariables["KAFKA_SCHEMA_REGISTRY_URL"] = schemaRegistry.GetEndpoint(primaryEndpointName);
});

builder.AddContainer("kafka-ui", "kafka-ui", "v0.7.2")
    .WithImageRegistry("provectuslabs")
    .WithHttpEndpoint(targetPort: 8080, name: "publicPort")
    .WithEnvironment(context =>
    {
        context.EnvironmentVariables["KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS"] = "kafka:9093";
        context.EnvironmentVariables["KAFKA_CLUSTERS_0_NAME"] = "kafka";
        context.EnvironmentVariables["LOGGING_LEVEL_ROOT"] = "debug";
    });

@eerhardt eerhardt added this to the Backlog milestone Jan 14, 2025
@eerhardt
Copy link
Member

@g7ed6e - can you verify if this is still an issue? Or can this be closed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-integrations Issues pertaining to Aspire Integrations packages kafka Issues related to Kafka integrations
Projects
None yet
Development

No branches or pull requests

8 participants