-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2466] feat(catalog-kafka): Add code skeleton for Kafka catalog #2509
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
description = "catalog-messaging-kafka" | ||
|
||
plugins { | ||
`maven-publish` | ||
id("java") | ||
id("idea") | ||
} | ||
|
||
dependencies { | ||
implementation(project(":api")) | ||
implementation(project(":core")) | ||
implementation(project(":common")) | ||
} | ||
|
||
tasks { | ||
val runtimeJars by registering(Copy::class) { | ||
from(configurations.runtimeClasspath) | ||
into("build/libs") | ||
} | ||
|
||
val copyCatalogLibs by registering(Copy::class) { | ||
dependsOn(jar, runtimeJars) | ||
from("build/libs") | ||
into("$rootDir/distribution/package/catalogs/messaging-kafka/libs") | ||
} | ||
|
||
val copyCatalogConfig by registering(Copy::class) { | ||
from("src/main/resources") | ||
into("$rootDir/distribution/package/catalogs/messaging-kafka/conf") | ||
|
||
// TODO. add configuration file later on. | ||
|
||
rename { original -> | ||
if (original.endsWith(".template")) { | ||
original.replace(".template", "") | ||
} else { | ||
original | ||
} | ||
} | ||
|
||
exclude { details -> | ||
details.file.isDirectory() | ||
} | ||
} | ||
|
||
register("copyLibAndConfig", Copy::class) { | ||
dependsOn(copyCatalogConfig, copyCatalogLibs) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you still miss several integration test related codes here, are you going to do this in a separate PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, a todo added to the below |
||
|
||
// TODO. add test task later on. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
|
||
package com.datastrato.gravitino.catalog.kafka; | ||
|
||
import com.datastrato.gravitino.catalog.BaseCatalog; | ||
import com.datastrato.gravitino.catalog.CatalogOperations; | ||
import com.datastrato.gravitino.messaging.TopicCatalog; | ||
import com.datastrato.gravitino.rel.SupportsSchemas; | ||
import java.util.Map; | ||
|
||
/** Kafka catalog is a messaging catalog that can manage topics on the Kafka messaging system. */ | ||
public class KafkaCatalog extends BaseCatalog<KafkaCatalog> { | ||
|
||
@Override | ||
public String shortName() { | ||
return "kafka"; | ||
} | ||
|
||
@Override | ||
protected CatalogOperations newOps(Map<String, String> config) { | ||
KafkaCatalogOperations ops = new KafkaCatalogOperations(entity()); | ||
ops.initialize(config); | ||
return ops; | ||
} | ||
|
||
@Override | ||
public SupportsSchemas asSchemas() throws UnsupportedOperationException { | ||
return (KafkaCatalogOperations) ops(); | ||
} | ||
|
||
@Override | ||
public TopicCatalog asTopicCatalog() throws UnsupportedOperationException { | ||
return (KafkaCatalogOperations) ops(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
package com.datastrato.gravitino.catalog.kafka; | ||
|
||
import com.datastrato.gravitino.Entity; | ||
import com.datastrato.gravitino.EntityStore; | ||
import com.datastrato.gravitino.GravitinoEnv; | ||
import com.datastrato.gravitino.NameIdentifier; | ||
import com.datastrato.gravitino.Namespace; | ||
import com.datastrato.gravitino.catalog.CatalogOperations; | ||
import com.datastrato.gravitino.catalog.PropertiesMetadata; | ||
import com.datastrato.gravitino.exceptions.NoSuchCatalogException; | ||
import com.datastrato.gravitino.exceptions.NoSuchSchemaException; | ||
import com.datastrato.gravitino.exceptions.NoSuchTopicException; | ||
import com.datastrato.gravitino.exceptions.NonEmptySchemaException; | ||
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; | ||
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; | ||
import com.datastrato.gravitino.messaging.DataLayout; | ||
import com.datastrato.gravitino.messaging.Topic; | ||
import com.datastrato.gravitino.messaging.TopicCatalog; | ||
import com.datastrato.gravitino.messaging.TopicChange; | ||
import com.datastrato.gravitino.meta.CatalogEntity; | ||
import com.datastrato.gravitino.meta.SchemaEntity; | ||
import com.datastrato.gravitino.rel.Schema; | ||
import com.datastrato.gravitino.rel.SchemaChange; | ||
import com.datastrato.gravitino.rel.SupportsSchemas; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog { | ||
|
||
private static final KafkaCatalogPropertiesMetadata CATALOG_PROPERTIES_METADATA = | ||
new KafkaCatalogPropertiesMetadata(); | ||
private static final KafkaSchemaPropertiesMetadata SCHEMA_PROPERTIES_METADATA = | ||
new KafkaSchemaPropertiesMetadata(); | ||
private static final KafkaTopicPropertiesMetadata TOPIC_PROPERTIES_METADATA = | ||
new KafkaTopicPropertiesMetadata(); | ||
|
||
private final CatalogEntity entity; | ||
private final EntityStore store; | ||
|
||
public KafkaCatalogOperations(CatalogEntity entity) { | ||
this.entity = entity; | ||
this.store = GravitinoEnv.getInstance().entityStore(); | ||
} | ||
|
||
@Override | ||
public void initialize(Map<String, String> config) throws RuntimeException { | ||
// TODO: Implement Kafka catalog initialization, such as creating a default schema. | ||
} | ||
|
||
@Override | ||
public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Topic createTopic( | ||
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) | ||
throws NoSuchSchemaException, TopicAlreadyExistsException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Topic alterTopic(NameIdentifier ident, TopicChange... changes) | ||
throws NoSuchTopicException, IllegalArgumentException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { | ||
try { | ||
List<SchemaEntity> schemas = | ||
store.list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA); | ||
return schemas.stream() | ||
.map(s -> NameIdentifier.of(namespace, s.name())) | ||
.toArray(NameIdentifier[]::new); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to list schemas under namespace " + namespace, e); | ||
} | ||
} | ||
|
||
@Override | ||
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties) | ||
throws NoSuchCatalogException, SchemaAlreadyExistsException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) | ||
throws NoSuchSchemaException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
Comment on lines
+96
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it unsupported or you will do in the next PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, schema-relative operations will be done in another pull request. |
||
|
||
@Override | ||
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { | ||
return CATALOG_PROPERTIES_METADATA; | ||
} | ||
|
||
@Override | ||
public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { | ||
return SCHEMA_PROPERTIES_METADATA; | ||
} | ||
|
||
@Override | ||
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException { | ||
return TOPIC_PROPERTIES_METADATA; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException {} | ||
|
||
@Override | ||
public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { | ||
throw new UnsupportedOperationException("Kafka catalog does not support fileset operations"); | ||
} | ||
|
||
@Override | ||
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { | ||
throw new UnsupportedOperationException("Kafka catalog does not support table operations"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
package com.datastrato.gravitino.catalog.kafka; | ||
|
||
import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata; | ||
import com.datastrato.gravitino.catalog.PropertyEntry; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class KafkaCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata { | ||
|
||
// The "bootstrap.servers" property specifies the Kafka broker(s) to connect to, allowing for | ||
// multiple brokers by comma-separating them. | ||
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; | ||
|
||
private static final Map<String, PropertyEntry<?>> KAFKA_CATALOG_PROPERTY_ENTRIES = | ||
Collections.singletonMap( | ||
BOOTSTRAP_SERVERS, | ||
PropertyEntry.stringRequiredPropertyEntry( | ||
BOOTSTRAP_SERVERS, | ||
"The Kafka broker(s) to connect to, allowing for multiple brokers by comma-separating them", | ||
true /* immutable */, | ||
false /* hidden */)); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this only include one attribute, with the other attributes to be added in a later PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, other properties' metadata will be added in a later PR |
||
@Override | ||
protected Map<String, PropertyEntry<?>> specificPropertyEntries() { | ||
return KAFKA_CATALOG_PROPERTY_ENTRIES; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
package com.datastrato.gravitino.catalog.kafka; | ||
|
||
import com.datastrato.gravitino.catalog.BasePropertiesMetadata; | ||
import com.datastrato.gravitino.catalog.PropertyEntry; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class KafkaSchemaPropertiesMetadata extends BasePropertiesMetadata { | ||
|
||
@Override | ||
protected Map<String, PropertyEntry<?>> specificPropertyEntries() { | ||
return Collections.emptyMap(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright 2024 Datastrato Pvt Ltd. | ||
* This software is licensed under the Apache License version 2. | ||
*/ | ||
package com.datastrato.gravitino.catalog.kafka; | ||
|
||
import com.datastrato.gravitino.catalog.BasePropertiesMetadata; | ||
import com.datastrato.gravitino.catalog.PropertyEntry; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class KafkaTopicPropertiesMetadata extends BasePropertiesMetadata { | ||
@Override | ||
protected Map<String, PropertyEntry<?>> specificPropertyEntries() { | ||
return Collections.emptyMap(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not provide a default implementation instead of implementing a throw UnsupportedOperationException in each class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we need to draw the attention of the developers of the catalog to this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will cause trouble for extensions. For example, if another xxxPropertiesMetadata is added to the interface, all subclasses will have to add the same code, isn't that troublesome?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could create a separate issue for this? Since
tablePropertiesMetadata
andfilesetPropertiesMetadata
have the same implementations, we should incorporate overall changes in another PR.What would you suggest? @jerryshao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can create a separate issue to rethink the current interface of
xxxPropertiesMetadata
.