Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2466] feat(catalog-kafka): Add code skeleton for Kafka catalog #2509

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.annotation.Evolving;
import com.datastrato.gravitino.file.FilesetCatalog;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand Down Expand Up @@ -90,4 +91,12 @@ default TableCatalog asTableCatalog() throws UnsupportedOperationException {
default FilesetCatalog asFilesetCatalog() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Catalog does not support fileset operations");
}

/**
* @return the {@link TopicCatalog} if the catalog supports topic operations.
* @throws UnsupportedOperationException if the catalog does not support topic operations.
*/
default TopicCatalog asTopicCatalog() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Catalog does not support topic operations");
}
}
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ tasks {
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig",
":catalogs:catalog-jdbc-postgresql:copyLibAndConfig",
":catalogs:catalog-hadoop:copyLibAndConfig"
":catalogs:catalog-hadoop:copyLibAndConfig",
"catalogs:catalog-messaging-kafka:copyLibAndConfig"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,12 @@ public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationE
"Hadoop fileset catalog doesn't support table related operations");
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Hadoop fileset catalog doesn't support topic related operations");
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return CATALOG_PROPERTIES_METADATA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,12 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio
"Hive catalog does not support fileset properties metadata");
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Hive catalog does not support topic properties metadata");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not provide a default implementation instead of implementing a throw UnsupportedOperationException in each class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to draw the attention of the developers of the catalog to this interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will cause trouble for extensions. For example, if another xxxPropertiesMetadata is added to the interface, all subclasses will have to add the same code, isn't that troublesome?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could create a separate issue for this? Since tablePropertiesMetadata and filesetPropertiesMetadata have the same implementations, we should incorporate overall changes in another PR.
What would you suggest? @jerryshao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we can create a separate issue to rethink the current interface of xxxPropertiesMetadata.

CachedClientPool getClientPool() {
return clientPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,4 +511,10 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio
throw new UnsupportedOperationException(
"Jdbc catalog doesn't support fileset related operations");
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Jdbc catalog doesn't support topic related operations");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -576,4 +576,10 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio
throw new UnsupportedOperationException(
"Iceberg catalog doesn't support fileset related operations");
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Iceberg catalog doesn't support topic related operations");
}
}
55 changes: 55 additions & 0 deletions catalogs/catalog-messaging-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-messaging-kafka"

plugins {
`maven-publish`
id("java")
id("idea")
}

dependencies {
implementation(project(":api"))
implementation(project(":core"))
implementation(project(":common"))
}

tasks {
val runtimeJars by registering(Copy::class) {
from(configurations.runtimeClasspath)
into("build/libs")
}

val copyCatalogLibs by registering(Copy::class) {
dependsOn(jar, runtimeJars)
from("build/libs")
into("$rootDir/distribution/package/catalogs/messaging-kafka/libs")
}

val copyCatalogConfig by registering(Copy::class) {
from("src/main/resources")
into("$rootDir/distribution/package/catalogs/messaging-kafka/conf")

// TODO. add configuration file later on.

rename { original ->
if (original.endsWith(".template")) {
original.replace(".template", "")
} else {
original
}
}

exclude { details ->
details.file.isDirectory()
}
}

register("copyLibAndConfig", Copy::class) {
dependsOn(copyCatalogConfig, copyCatalogLibs)
}
}
Copy link
Contributor

@jerryshao jerryshao Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you still miss several integration test related codes here, are you going to do this in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a todo added to the below


// TODO. add test task later on.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.rel.SupportsSchemas;
import java.util.Map;

/** Kafka catalog is a messaging catalog that can manage topics on the Kafka messaging system. */
public class KafkaCatalog extends BaseCatalog<KafkaCatalog> {

@Override
public String shortName() {
return "kafka";
}

@Override
protected CatalogOperations newOps(Map<String, String> config) {
KafkaCatalogOperations ops = new KafkaCatalogOperations(entity());
ops.initialize(config);
return ops;
}

@Override
public SupportsSchemas asSchemas() throws UnsupportedOperationException {
return (KafkaCatalogOperations) ops();
}

@Override
public TopicCatalog asTopicCatalog() throws UnsupportedOperationException {
return (KafkaCatalogOperations) ops();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.PropertiesMetadata;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import com.datastrato.gravitino.messaging.DataLayout;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.SupportsSchemas;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog {

private static final KafkaCatalogPropertiesMetadata CATALOG_PROPERTIES_METADATA =
new KafkaCatalogPropertiesMetadata();
private static final KafkaSchemaPropertiesMetadata SCHEMA_PROPERTIES_METADATA =
new KafkaSchemaPropertiesMetadata();
private static final KafkaTopicPropertiesMetadata TOPIC_PROPERTIES_METADATA =
new KafkaTopicPropertiesMetadata();

private final CatalogEntity entity;
private final EntityStore store;

public KafkaCatalogOperations(CatalogEntity entity) {
this.entity = entity;
this.store = GravitinoEnv.getInstance().entityStore();
}

@Override
public void initialize(Map<String, String> config) throws RuntimeException {
// TODO: Implement Kafka catalog initialization, such as creating a default schema.
}

@Override
public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
throw new UnsupportedOperationException();
}

@Override
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
throw new UnsupportedOperationException();
}

@Override
public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException {
throw new UnsupportedOperationException();
}

@Override
public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException {
throw new UnsupportedOperationException();
}

@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
try {
List<SchemaEntity> schemas =
store.list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA);
return schemas.stream()
.map(s -> NameIdentifier.of(namespace, s.name()))
.toArray(NameIdentifier[]::new);
} catch (IOException e) {
throw new RuntimeException("Failed to list schemas under namespace " + namespace, e);
}
}

@Override
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
}

@Override
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

@Override
public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}
Comment on lines +96 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it unsupported or you will do in the next PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, schema-relative operations will be done in another pull request.


@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
throw new UnsupportedOperationException();
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return CATALOG_PROPERTIES_METADATA;
}

@Override
public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException {
return SCHEMA_PROPERTIES_METADATA;
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
return TOPIC_PROPERTIES_METADATA;
}

@Override
public void close() throws IOException {}

@Override
public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Kafka catalog does not support fileset operations");
}

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Kafka catalog does not support table operations");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import java.util.Map;

public class KafkaCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {

// The "bootstrap.servers" property specifies the Kafka broker(s) to connect to, allowing for
// multiple brokers by comma-separating them.
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";

private static final Map<String, PropertyEntry<?>> KAFKA_CATALOG_PROPERTY_ENTRIES =
Collections.singletonMap(
BOOTSTRAP_SERVERS,
PropertyEntry.stringRequiredPropertyEntry(
BOOTSTRAP_SERVERS,
"The Kafka broker(s) to connect to, allowing for multiple brokers by comma-separating them",
true /* immutable */,
false /* hidden */));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this only include one attribute, with the other attributes to be added in a later PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, other properties' metadata will be added in a later PR

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return KAFKA_CATALOG_PROPERTY_ENTRIES;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.catalog.BasePropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import java.util.Map;

public class KafkaSchemaPropertiesMetadata extends BasePropertiesMetadata {

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.catalog.BasePropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import java.util.Map;

public class KafkaTopicPropertiesMetadata extends BasePropertiesMetadata {
@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
}
}
Loading
Loading