diff --git a/api/src/main/java/com/datastrato/gravitino/Catalog.java b/api/src/main/java/com/datastrato/gravitino/Catalog.java
index 44a3948006c..3bc7b2251a3 100644
--- a/api/src/main/java/com/datastrato/gravitino/Catalog.java
+++ b/api/src/main/java/com/datastrato/gravitino/Catalog.java
@@ -26,7 +26,7 @@ enum Type {
FILESET,
/** Catalog Type for Message Queue, like kafka://topic */
- STREAM
+ MESSAGING
}
/**
diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java
new file mode 100644
index 00000000000..c85b6c65c01
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+
+/** Exception thrown when a topic with specified name is not existed. */
+public class NoSuchTopicException extends NotFoundException {
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param message the detail message.
+ * @param args the arguments to the message.
+ */
+ @FormatMethod
+ public NoSuchTopicException(@FormatString String message, Object... args) {
+ super(message, args);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param cause the cause.
+ * @param message the detail message.
+ * @param args the arguments to the message.
+ */
+ @FormatMethod
+ public NoSuchTopicException(Throwable cause, String message, Object... args) {
+ super(cause, message, args);
+ }
+}
diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java
new file mode 100644
index 00000000000..99198e58968
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+
+/** Exception thrown when a topic with specified name already exists. */
+public class TopicAlreadyExistsException extends AlreadyExistsException {
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param message the detail message.
+ * @param args the arguments to the message.
+ */
+ @FormatMethod
+ public TopicAlreadyExistsException(@FormatString String message, Object... args) {
+ super(message, args);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param cause the cause.
+ * @param message the detail message.
+ * @param args the arguments to the message.
+ */
+ @FormatMethod
+ public TopicAlreadyExistsException(
+ Throwable cause, @FormatString String message, Object... args) {
+ super(cause, message, args);
+ }
+}
diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java b/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java
new file mode 100644
index 00000000000..86c5ae3e0d4
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java
@@ -0,0 +1,14 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.messaging;
+
+import com.datastrato.gravitino.annotation.Unstable;
+
+/**
+ * The interface for message schema of a topic. Currently not implemented, only reserved as a
+ * placeholder interface.
+ */
+@Unstable
+public interface DataLayout {}
diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java
new file mode 100644
index 00000000000..aa356c09c8e
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.messaging;
+
+import com.datastrato.gravitino.Auditable;
+import com.datastrato.gravitino.annotation.Evolving;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * An interface representing a topic under a schema {@link com.datastrato.gravitino.Namespace}. A
+ * topic is a message queue that is managed by Gravitino. Users can create/drop/alter a topic on the
+ * Message Queue system like Kafka, Pulsar, etc.
+ *
+ *
{@link Topic} defines the basic properties of a topic object. A catalog implementation with
+ * {@link TopicCatalog} should implement this interface.
+ */
+@Evolving
+public interface Topic extends Auditable {
+
+ /** @return Name of the topic */
+ String name();
+
+ /** @return The comment of the topic object. Null is returned if no comment is set. */
+ @Nullable
+ default String comment() {
+ return null;
+ }
+
+ /** @return The properties of the topic object. Empty map is returned if no properties are set. */
+ default Map properties() {
+ return Collections.emptyMap();
+ }
+}
diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java
new file mode 100644
index 00000000000..de10613623c
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.messaging;
+
+import com.datastrato.gravitino.NameIdentifier;
+import com.datastrato.gravitino.Namespace;
+import com.datastrato.gravitino.annotation.Evolving;
+import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
+import com.datastrato.gravitino.exceptions.NoSuchTopicException;
+import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
+import java.util.Map;
+
+/**
+ * The {@link TopicCatalog} interface defines the public API for managing topic objects in a schema.
+ * If the catalog implementation supports topic objects, it should implement this interface.
+ */
+@Evolving
+public interface TopicCatalog {
+
+ /**
+ * List the topics in a schema namespace from the catalog.
+ *
+ * @param namespace A schema namespace.
+ * @return An array of topic identifiers in the namespace.
+ * @throws NoSuchSchemaException If the schema does not exist.
+ */
+ NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException;
+
+ /**
+ * Load topic metadata by {@link NameIdentifier} from the catalog.
+ *
+ * @param ident A topic identifier.
+ * @return The topic metadata.
+ * @throws NoSuchTopicException If the topic does not exist.
+ */
+ Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException;
+
+ /**
+ * Check if a topic exists using an {@link NameIdentifier} from the catalog.
+ *
+ * @param ident A topic identifier.
+ * @return true If the topic exists, false otherwise.
+ */
+ default boolean topicExists(NameIdentifier ident) {
+ try {
+ loadTopic(ident);
+ return true;
+ } catch (NoSuchTopicException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Create a topic in the catalog.
+ *
+ * @param ident A topic identifier.
+ * @param comment The comment of the topic object. Null is set if no comment is specified.
+ * @param dataLayout The message schema of the topic object. Always null because it's not
+ * supported yet.
+ * @param properties The properties of the topic object. Empty map is set if no properties are
+ * specified.
+ * @return The topic metadata.
+ * @throws NoSuchSchemaException If the schema does not exist.
+ * @throws TopicAlreadyExistsException If the topic already exists.
+ */
+ Topic createTopic(
+ NameIdentifier ident, String comment, DataLayout dataLayout, Map properties)
+ throws NoSuchSchemaException, TopicAlreadyExistsException;
+
+ /**
+ * Apply the {@link TopicChange changes} to a topic in the catalog.
+ *
+ * @param ident A topic identifier.
+ * @param changes The changes to apply to the topic.
+ * @return The altered topic metadata.
+ * @throws NoSuchTopicException If the topic does not exist.
+ * @throws IllegalArgumentException If the changes is rejected by the implementation.
+ */
+ Topic alterTopic(NameIdentifier ident, TopicChange... changes)
+ throws NoSuchTopicException, IllegalArgumentException;
+
+ /**
+ * Drop a topic from the catalog.
+ *
+ * @param ident A topic identifier.
+ * @return true If the topic is dropped, false otherwise.
+ * @throws NoSuchTopicException If the topic does not exist.
+ */
+ boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException;
+}
diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java b/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java
new file mode 100644
index 00000000000..e07189c7d70
--- /dev/null
+++ b/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.messaging;
+
+import com.datastrato.gravitino.annotation.Evolving;
+import java.util.Objects;
+
+/**
+ * A topic change is a change to a topic. It can be used to update the comment of a topic, set a
+ * property and value pair for a topic, or remove a property from a topic in the catalog.
+ */
+@Evolving
+public interface TopicChange {
+
+ /**
+ * Creates a new topic change to update the topic comment.
+ *
+ * @param newComment The new comment for the topic.
+ * @return The topic change.
+ */
+ static TopicChange updateComment(String newComment) {
+ return new TopicChange.UpdateTopicComment(newComment);
+ }
+
+ /**
+ * Creates a new topic change to set or update the property and value for the topic.
+ *
+ * @param property The property name to set.
+ * @param value The value to set the property to.
+ * @return The topic change.
+ */
+ static TopicChange setProperty(String property, String value) {
+ return new TopicChange.SetProperty(property, value);
+ }
+
+ /**
+ * Creates a new topic change to remove a property from the topic.
+ *
+ * @param property The property name to remove.
+ * @return The topic change.
+ */
+ static TopicChange removeProperty(String property) {
+ return new TopicChange.RemoveProperty(property);
+ }
+
+ /** A topic change to update the topic comment. */
+ final class UpdateTopicComment implements TopicChange {
+ private final String newComment;
+
+ private UpdateTopicComment(String newComment) {
+ this.newComment = newComment;
+ }
+
+ /**
+ * Retrieves the new comment for the topic.
+ *
+ * @return The new comment for the topic.
+ */
+ public String getNewComment() {
+ return newComment;
+ }
+
+ /**
+ * Compares this UpdateTopicComment instance with another object for equality. Two instances are
+ * considered equal if they have the same new comment for the topic.
+ *
+ * @param o The object to compare with this instance.
+ * @return true if the given object represents the same comment update; false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TopicChange.UpdateTopicComment that = (TopicChange.UpdateTopicComment) o;
+ return Objects.equals(newComment, that.newComment);
+ }
+
+ /**
+ * Generates a hash code for this UpdateTopicComment instance. The hash code is based on the new
+ * comment for the topic.
+ *
+ * @return A hash code representing this comment update operation.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(newComment);
+ }
+
+ /**
+ * Provides a string representation of the UpdateTopicComment instance. This string format
+ * includes the class name followed by the new comment for the topic.
+ *
+ * @return A string summary of this comment update operation.
+ */
+ @Override
+ public String toString() {
+ return "UPDATETOPICCOMMENT " + newComment;
+ }
+ }
+
+ /** A topic change to set or update the property and value for the topic. */
+ final class SetProperty implements TopicChange {
+ private final String property;
+ private final String value;
+
+ private SetProperty(String property, String value) {
+ this.property = property;
+ this.value = value;
+ }
+
+ /**
+ * Retrieves the name of the property being set in the topic.
+ *
+ * @return The name of the property.
+ */
+ public String getProperty() {
+ return property;
+ }
+
+ /**
+ * Retrieves the value assigned to the property in the topic.
+ *
+ * @return The value of the property.
+ */
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Compares this SetProperty instance with another object for equality. Two instances are
+ * considered equal if they have the same property and value for the topic.
+ *
+ * @param o The object to compare with this instance.
+ * @return true if the given object represents the same property setting; false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SetProperty that = (SetProperty) o;
+ return Objects.equals(property, that.property) && Objects.equals(value, that.value);
+ }
+
+ /**
+ * Generates a hash code for this SetProperty instance. The hash code is based on both the
+ * property name and its assigned value.
+ *
+ * @return A hash code value for this property setting.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(property, value);
+ }
+
+ /**
+ * Provides a string representation of the SetProperty instance. This string format includes the
+ * class name followed by the property and its value.
+ *
+ * @return A string summary of the property setting.
+ */
+ @Override
+ public String toString() {
+ return "SETPROPERTY " + property + " " + value;
+ }
+ }
+
+ /** A topic change to remove a property from the topic. */
+ final class RemoveProperty implements TopicChange {
+ private final String property;
+
+ private RemoveProperty(String property) {
+ this.property = property;
+ }
+
+ /**
+ * Retrieves the name of the property to be removed from the topic.
+ *
+ * @return The name of the property for removal.
+ */
+ public String getProperty() {
+ return property;
+ }
+
+ /**
+ * Compares this RemoveProperty instance with another object for equality. Two instances are
+ * considered equal if they target the same property for removal from the topic.
+ *
+ * @param o The object to compare with this instance.
+ * @return true if the given object represents the same property removal; false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RemoveProperty that = (RemoveProperty) o;
+ return Objects.equals(property, that.property);
+ }
+
+ /**
+ * Generates a hash code for this RemoveProperty instance. The hash code is based on the
+ * property name that is to be removed from the topic.
+ *
+ * @return A hash code value for this property removal operation.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(property);
+ }
+
+ /**
+ * Provides a string representation of the RemoveProperty instance. This string format includes
+ * the class name followed by the property name to be removed.
+ *
+ * @return A string summary of the property removal operation.
+ */
+ @Override
+ public String toString() {
+ return "REMOVEPROPERTY " + property;
+ }
+ }
+}
diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java
index 30df6657821..fc59a2efa84 100644
--- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java
+++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java
@@ -80,7 +80,7 @@ static Catalog toCatalog(CatalogDTO catalog, RESTClient client) {
.withRestClient(client)
.build();
- case STREAM:
+ case MESSAGING:
default:
throw new UnsupportedOperationException("Unsupported catalog type: " + catalog.type());
}
diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java
index eddd13dbde7..77153bfd9dd 100644
--- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java
+++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java
@@ -125,7 +125,7 @@ public void testLoadCatalog() throws JsonProcessingException {
new CatalogDTO.Builder()
.withName("mock")
.withComment("comment")
- .withType(Catalog.Type.STREAM)
+ .withType(Catalog.Type.MESSAGING)
.withProvider("test")
.withAudit(
new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build())
@@ -182,14 +182,14 @@ public void testCreateCatalog() throws JsonProcessingException {
new CatalogDTO.Builder()
.withName("mock")
.withComment("comment")
- .withType(Catalog.Type.STREAM)
+ .withType(Catalog.Type.MESSAGING)
.withProvider("test")
.withAudit(
new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build())
.build();
CatalogCreateRequest req1 =
new CatalogCreateRequest(
- catalogName, Catalog.Type.STREAM, provider, "comment", Collections.emptyMap());
+ catalogName, Catalog.Type.MESSAGING, provider, "comment", Collections.emptyMap());
CatalogResponse resp1 = new CatalogResponse(mockCatalog1);
buildMockResource(Method.POST, path, req1, resp1, HttpStatus.SC_OK);
NameIdentifier id = NameIdentifier.of(metalakeName, catalogName);
@@ -197,7 +197,7 @@ public void testCreateCatalog() throws JsonProcessingException {
Assertions.assertThrows(
UnsupportedOperationException.class,
- () -> metalake.createCatalog(id, Catalog.Type.STREAM, provider, "comment", emptyMap));
+ () -> metalake.createCatalog(id, Catalog.Type.MESSAGING, provider, "comment", emptyMap));
// Test return NoSuchMetalakeException
ErrorResponse errorResponse =
diff --git a/core/src/main/java/com/datastrato/gravitino/Entity.java b/core/src/main/java/com/datastrato/gravitino/Entity.java
index 65e3d073a98..116919fe621 100644
--- a/core/src/main/java/com/datastrato/gravitino/Entity.java
+++ b/core/src/main/java/com/datastrato/gravitino/Entity.java
@@ -20,6 +20,7 @@ enum EntityType {
TABLE("ta", 3),
COLUMN("co", 4),
FILESET("fi", 5),
+ TOPIC("to", 6),
AUDIT("au", 65534);
diff --git a/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java b/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java
new file mode 100644
index 00000000000..8b0a3180314
--- /dev/null
+++ b/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.meta;
+
+import com.datastrato.gravitino.Auditable;
+import com.datastrato.gravitino.Entity;
+import com.datastrato.gravitino.Field;
+import com.datastrato.gravitino.HasIdentifier;
+import com.datastrato.gravitino.Namespace;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import lombok.ToString;
+
+/** A class representing a topic metadata entity in Gravitino. */
+@ToString
+public class TopicEntity implements Entity, Auditable, HasIdentifier {
+ public static final Field ID =
+ Field.required("id", Long.class, "The unique id of the topic entity.");
+ public static final Field NAME =
+ Field.required("name", String.class, "The name of the topic entity.");
+ public static final Field COMMENT =
+ Field.optional("comment", String.class, "The comment or description of the topic entity.");
+ public static final Field AUDIT_INFO =
+ Field.required("audit_info", AuditInfo.class, "The audit details of the topic entity.");
+ public static final Field PROPERTIES =
+ Field.optional("properties", Map.class, "The properties of the topic entity.");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ private Long id;
+ private String name;
+ private Namespace namespace;
+ private String comment;
+ private AuditInfo auditInfo;
+ private Map properties;
+
+ private TopicEntity() {}
+
+ /**
+ * Returns a map of fields and their corresponding values for this topic entity.
+ *
+ * @return An unmodifiable map of the fields and values.
+ */
+ @Override
+ public Map fields() {
+ Map fields = Maps.newHashMap();
+ fields.put(ID, id);
+ fields.put(NAME, name);
+ fields.put(COMMENT, comment);
+ fields.put(AUDIT_INFO, auditInfo);
+ fields.put(PROPERTIES, properties);
+
+ return Collections.unmodifiableMap(fields);
+ }
+
+ /**
+ * Returns the name of the topic.
+ *
+ * @return The name of the topic.
+ */
+ @Override
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the namespace of the topic.
+ *
+ * @return The namespace of the topic.
+ */
+ @Override
+ public Namespace namespace() {
+ return namespace;
+ }
+
+ /**
+ * Returns the unique id of the topic.
+ *
+ * @return The unique id of the topic.
+ */
+ @Override
+ public Long id() {
+ return id;
+ }
+
+ /**
+ * Returns the comment or description of the topic.
+ *
+ * @return The comment or description of the topic.
+ */
+ public String comment() {
+ return comment;
+ }
+
+ /**
+ * Returns the audit details of the topic.
+ *
+ * @return The audit details of the topic.
+ */
+ @Override
+ public AuditInfo auditInfo() {
+ return auditInfo;
+ }
+
+ /**
+ * Returns the type of the entity.
+ *
+ * @return The type of the entity.
+ */
+ @Override
+ public EntityType type() {
+ return EntityType.TOPIC;
+ }
+
+ /**
+ * Returns the properties of the topic entity.
+ *
+ * @return The properties of the topic entity.
+ */
+ public Map properties() {
+ return properties;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TopicEntity)) return false;
+
+ TopicEntity that = (TopicEntity) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(name, that.name)
+ && Objects.equals(comment, that.comment)
+ && Objects.equals(auditInfo, that.auditInfo)
+ && Objects.equals(properties, that.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, comment, auditInfo, properties);
+ }
+
+ public static class Builder {
+ private final TopicEntity topic;
+
+ private Builder() {
+ topic = new TopicEntity();
+ }
+
+ /**
+ * Sets the unique id of the topic entity.
+ *
+ * @param id The unique id of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withId(Long id) {
+ topic.id = id;
+ return this;
+ }
+
+ /**
+ * Sets the name of the topic entity.
+ *
+ * @param name The name of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withName(String name) {
+ topic.name = name;
+ return this;
+ }
+
+ /**
+ * Sets the namespace of the topic entity.
+ *
+ * @param namespace The namespace of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withNamespace(Namespace namespace) {
+ topic.namespace = namespace;
+ return this;
+ }
+
+ /**
+ * Sets the comment or description of the topic entity.
+ *
+ * @param comment The comment or description of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withComment(String comment) {
+ topic.comment = comment;
+ return this;
+ }
+
+ /**
+ * Sets the audit details of the topic entity.
+ *
+ * @param auditInfo The audit details of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withAuditInfo(AuditInfo auditInfo) {
+ topic.auditInfo = auditInfo;
+ return this;
+ }
+
+ /**
+ * Sets the properties of the topic entity.
+ *
+ * @param properties The properties of the topic entity.
+ * @return The builder instance.
+ */
+ public TopicEntity.Builder withProperties(Map properties) {
+ topic.properties = properties;
+ return this;
+ }
+
+ /**
+ * Builds the topic entity.
+ *
+ * @return The built topic entity.
+ */
+ public TopicEntity build() {
+ topic.validate();
+ return topic;
+ }
+ }
+}
diff --git a/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java b/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java
index 08c00b6b6bf..182716cbfcd 100644
--- a/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java
+++ b/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java
@@ -37,6 +37,9 @@ public class ProtoEntitySerDe implements EntitySerDe {
.put(
"com.datastrato.gravitino.meta.FilesetEntity",
"com.datastrato.gravitino.proto.FilesetEntitySerDe")
+ .put(
+ "com.datastrato.gravitino.meta.TopicEntity",
+ "com.datastrato.gravitino.proto.TopicEntitySerDe")
.build();
private static final Map ENTITY_TO_PROTO =
@@ -52,7 +55,9 @@ public class ProtoEntitySerDe implements EntitySerDe {
"com.datastrato.gravitino.meta.TableEntity",
"com.datastrato.gravitino.proto.Table",
"com.datastrato.gravitino.meta.FilesetEntity",
- "com.datastrato.gravitino.proto.Fileset");
+ "com.datastrato.gravitino.proto.Fileset",
+ "com.datastrato.gravitino.meta.TopicEntity",
+ "com.datastrato.gravitino.proto.Topic");
private final Map, ProtoSerDe extends Entity, ? extends Message>>
entityToSerDe;
diff --git a/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java b/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java
new file mode 100644
index 00000000000..c784030dc47
--- /dev/null
+++ b/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024 Datastrato Pvt Ltd.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.gravitino.proto;
+
+import com.datastrato.gravitino.meta.TopicEntity;
+
+public class TopicEntitySerDe implements ProtoSerDe {
+
+ @Override
+ public Topic serialize(TopicEntity topicEntity) {
+ Topic.Builder builder =
+ Topic.newBuilder()
+ .setId(topicEntity.id())
+ .setName(topicEntity.name())
+ .setAuditInfo(new AuditInfoSerDe().serialize(topicEntity.auditInfo()));
+
+ if (topicEntity.comment() != null) {
+ builder.setComment(topicEntity.comment());
+ }
+
+ if (topicEntity.properties() != null && !topicEntity.properties().isEmpty()) {
+ builder.putAllProperties(topicEntity.properties());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public TopicEntity deserialize(Topic p) {
+ TopicEntity.Builder builder =
+ TopicEntity.builder()
+ .withId(p.getId())
+ .withName(p.getName())
+ .withAuditInfo(new AuditInfoSerDe().deserialize(p.getAuditInfo()));
+
+ if (p.hasComment()) {
+ builder.withComment(p.getComment());
+ }
+
+ if (p.getPropertiesCount() > 0) {
+ builder.withProperties(p.getPropertiesMap());
+ }
+
+ return builder.build();
+ }
+}
diff --git a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
index d7d28199a34..7feff2608ea 100644
--- a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
+++ b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
@@ -43,6 +43,10 @@ public class TestEntity {
private final Long fileId = 1L;
private final String fileName = "testFile";
+ // Topic test data
+ private final Long topicId = 1L;
+ private final String topicName = "testTopic";
+
@Test
public void testMetalake() {
BaseMetalake metalake =
@@ -176,4 +180,28 @@ public void testFile() {
});
Assertions.assertEquals("Field storage_location is required", exception.getMessage());
}
+
+ @Test
+ public void testTopic() {
+ TopicEntity testTopic =
+ TopicEntity.builder()
+ .withId(topicId)
+ .withName(topicName)
+ .withAuditInfo(auditInfo)
+ .withComment("test topic comment")
+ .withProperties(map)
+ .build();
+
+ Map fields = testTopic.fields();
+ Assertions.assertEquals(topicId, fields.get(TopicEntity.ID));
+ Assertions.assertEquals(topicName, fields.get(TopicEntity.NAME));
+ Assertions.assertEquals(auditInfo, fields.get(TopicEntity.AUDIT_INFO));
+ Assertions.assertEquals("test topic comment", fields.get(TopicEntity.COMMENT));
+ Assertions.assertEquals(map, fields.get(TopicEntity.PROPERTIES));
+
+ TopicEntity testTopic1 =
+ TopicEntity.builder().withId(topicId).withName(topicName).withAuditInfo(auditInfo).build();
+ Assertions.assertNull(testTopic1.comment());
+ Assertions.assertNull(testTopic1.properties());
+ }
}
diff --git a/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java b/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java
index f29b95ff7a3..073d7b5a269 100644
--- a/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java
+++ b/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java
@@ -245,5 +245,34 @@ public void testEntitiesSerDe() throws IOException {
Assertions.assertEquals("testLocation", fileEntityFromBytes2.storageLocation());
Assertions.assertEquals(
com.datastrato.gravitino.file.Fileset.Type.EXTERNAL, fileEntityFromBytes2.filesetType());
+
+ // Test TopicEntity
+ Long topicId = 1L;
+ String topicName = "topic";
+ com.datastrato.gravitino.meta.TopicEntity topicEntity =
+ com.datastrato.gravitino.meta.TopicEntity.builder()
+ .withId(topicId)
+ .withName(topicName)
+ .withAuditInfo(auditInfo)
+ .withComment(comment)
+ .withProperties(props)
+ .build();
+ byte[] topicBytes = protoEntitySerDe.serialize(topicEntity);
+ com.datastrato.gravitino.meta.TopicEntity topicEntityFromBytes =
+ protoEntitySerDe.deserialize(topicBytes, com.datastrato.gravitino.meta.TopicEntity.class);
+ Assertions.assertEquals(topicEntity, topicEntityFromBytes);
+
+ com.datastrato.gravitino.meta.TopicEntity topicEntity1 =
+ com.datastrato.gravitino.meta.TopicEntity.builder()
+ .withId(topicId)
+ .withName(topicName)
+ .withAuditInfo(auditInfo)
+ .build();
+ byte[] topicBytes1 = protoEntitySerDe.serialize(topicEntity1);
+ com.datastrato.gravitino.meta.TopicEntity topicEntityFromBytes1 =
+ protoEntitySerDe.deserialize(topicBytes1, com.datastrato.gravitino.meta.TopicEntity.class);
+ Assertions.assertEquals(topicEntity1, topicEntityFromBytes1);
+ Assertions.assertNull(topicEntityFromBytes1.comment());
+ Assertions.assertNull(topicEntityFromBytes1.properties());
}
}
diff --git a/meta/src/main/proto/gravitino_meta.proto b/meta/src/main/proto/gravitino_meta.proto
index b5f24738528..2f7d6f9e15e 100644
--- a/meta/src/main/proto/gravitino_meta.proto
+++ b/meta/src/main/proto/gravitino_meta.proto
@@ -93,3 +93,11 @@ message Fileset {
map properties = 6;
AuditInfo audit_info = 7;
}
+
+message Topic {
+ uint64 id = 1;
+ string name = 2;
+ optional string comment = 3;
+ map properties = 4;
+ AuditInfo audit_info = 5;
+}