diff --git a/api/src/main/java/com/datastrato/gravitino/Catalog.java b/api/src/main/java/com/datastrato/gravitino/Catalog.java index 44a3948006c..3bc7b2251a3 100644 --- a/api/src/main/java/com/datastrato/gravitino/Catalog.java +++ b/api/src/main/java/com/datastrato/gravitino/Catalog.java @@ -26,7 +26,7 @@ enum Type { FILESET, /** Catalog Type for Message Queue, like kafka://topic */ - STREAM + MESSAGING } /** diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java new file mode 100644 index 00000000000..c85b6c65c01 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchTopicException.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.exceptions; + +import com.google.errorprone.annotations.FormatMethod; +import com.google.errorprone.annotations.FormatString; + +/** Exception thrown when a topic with specified name is not existed. */ +public class NoSuchTopicException extends NotFoundException { + + /** + * Constructs a new exception with the specified detail message. + * + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public NoSuchTopicException(@FormatString String message, Object... args) { + super(message, args); + } + + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param cause the cause. + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public NoSuchTopicException(Throwable cause, String message, Object... args) { + super(cause, message, args); + } +} diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java new file mode 100644 index 00000000000..99198e58968 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/exceptions/TopicAlreadyExistsException.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.exceptions; + +import com.google.errorprone.annotations.FormatMethod; +import com.google.errorprone.annotations.FormatString; + +/** Exception thrown when a topic with specified name already exists. */ +public class TopicAlreadyExistsException extends AlreadyExistsException { + + /** + * Constructs a new exception with the specified detail message. + * + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public TopicAlreadyExistsException(@FormatString String message, Object... args) { + super(message, args); + } + + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param cause the cause. + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public TopicAlreadyExistsException( + Throwable cause, @FormatString String message, Object... args) { + super(cause, message, args); + } +} diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java b/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java new file mode 100644 index 00000000000..86c5ae3e0d4 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/messaging/DataLayout.java @@ -0,0 +1,14 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.messaging; + +import com.datastrato.gravitino.annotation.Unstable; + +/** + * The interface for message schema of a topic. Currently not implemented, only reserved as a + * placeholder interface. + */ +@Unstable +public interface DataLayout {} diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java new file mode 100644 index 00000000000..aa356c09c8e --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.messaging; + +import com.datastrato.gravitino.Auditable; +import com.datastrato.gravitino.annotation.Evolving; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * An interface representing a topic under a schema {@link com.datastrato.gravitino.Namespace}. A + * topic is a message queue that is managed by Gravitino. Users can create/drop/alter a topic on the + * Message Queue system like Kafka, Pulsar, etc. + * + *

{@link Topic} defines the basic properties of a topic object. A catalog implementation with + * {@link TopicCatalog} should implement this interface. + */ +@Evolving +public interface Topic extends Auditable { + + /** @return Name of the topic */ + String name(); + + /** @return The comment of the topic object. Null is returned if no comment is set. */ + @Nullable + default String comment() { + return null; + } + + /** @return The properties of the topic object. Empty map is returned if no properties are set. */ + default Map properties() { + return Collections.emptyMap(); + } +} diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java new file mode 100644 index 00000000000..de10613623c --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.messaging; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.annotation.Evolving; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTopicException; +import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; +import java.util.Map; + +/** + * The {@link TopicCatalog} interface defines the public API for managing topic objects in a schema. + * If the catalog implementation supports topic objects, it should implement this interface. + */ +@Evolving +public interface TopicCatalog { + + /** + * List the topics in a schema namespace from the catalog. + * + * @param namespace A schema namespace. + * @return An array of topic identifiers in the namespace. + * @throws NoSuchSchemaException If the schema does not exist. + */ + NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException; + + /** + * Load topic metadata by {@link NameIdentifier} from the catalog. + * + * @param ident A topic identifier. + * @return The topic metadata. + * @throws NoSuchTopicException If the topic does not exist. + */ + Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException; + + /** + * Check if a topic exists using an {@link NameIdentifier} from the catalog. + * + * @param ident A topic identifier. + * @return true If the topic exists, false otherwise. + */ + default boolean topicExists(NameIdentifier ident) { + try { + loadTopic(ident); + return true; + } catch (NoSuchTopicException e) { + return false; + } + } + + /** + * Create a topic in the catalog. + * + * @param ident A topic identifier. + * @param comment The comment of the topic object. Null is set if no comment is specified. + * @param dataLayout The message schema of the topic object. Always null because it's not + * supported yet. + * @param properties The properties of the topic object. Empty map is set if no properties are + * specified. + * @return The topic metadata. + * @throws NoSuchSchemaException If the schema does not exist. + * @throws TopicAlreadyExistsException If the topic already exists. + */ + Topic createTopic( + NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) + throws NoSuchSchemaException, TopicAlreadyExistsException; + + /** + * Apply the {@link TopicChange changes} to a topic in the catalog. + * + * @param ident A topic identifier. + * @param changes The changes to apply to the topic. + * @return The altered topic metadata. + * @throws NoSuchTopicException If the topic does not exist. + * @throws IllegalArgumentException If the changes is rejected by the implementation. + */ + Topic alterTopic(NameIdentifier ident, TopicChange... changes) + throws NoSuchTopicException, IllegalArgumentException; + + /** + * Drop a topic from the catalog. + * + * @param ident A topic identifier. + * @return true If the topic is dropped, false otherwise. + * @throws NoSuchTopicException If the topic does not exist. + */ + boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException; +} diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java b/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java new file mode 100644 index 00000000000..e07189c7d70 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/messaging/TopicChange.java @@ -0,0 +1,223 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.messaging; + +import com.datastrato.gravitino.annotation.Evolving; +import java.util.Objects; + +/** + * A topic change is a change to a topic. It can be used to update the comment of a topic, set a + * property and value pair for a topic, or remove a property from a topic in the catalog. + */ +@Evolving +public interface TopicChange { + + /** + * Creates a new topic change to update the topic comment. + * + * @param newComment The new comment for the topic. + * @return The topic change. + */ + static TopicChange updateComment(String newComment) { + return new TopicChange.UpdateTopicComment(newComment); + } + + /** + * Creates a new topic change to set or update the property and value for the topic. + * + * @param property The property name to set. + * @param value The value to set the property to. + * @return The topic change. + */ + static TopicChange setProperty(String property, String value) { + return new TopicChange.SetProperty(property, value); + } + + /** + * Creates a new topic change to remove a property from the topic. + * + * @param property The property name to remove. + * @return The topic change. + */ + static TopicChange removeProperty(String property) { + return new TopicChange.RemoveProperty(property); + } + + /** A topic change to update the topic comment. */ + final class UpdateTopicComment implements TopicChange { + private final String newComment; + + private UpdateTopicComment(String newComment) { + this.newComment = newComment; + } + + /** + * Retrieves the new comment for the topic. + * + * @return The new comment for the topic. + */ + public String getNewComment() { + return newComment; + } + + /** + * Compares this UpdateTopicComment instance with another object for equality. Two instances are + * considered equal if they have the same new comment for the topic. + * + * @param o The object to compare with this instance. + * @return true if the given object represents the same comment update; false otherwise. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TopicChange.UpdateTopicComment that = (TopicChange.UpdateTopicComment) o; + return Objects.equals(newComment, that.newComment); + } + + /** + * Generates a hash code for this UpdateTopicComment instance. The hash code is based on the new + * comment for the topic. + * + * @return A hash code representing this comment update operation. + */ + @Override + public int hashCode() { + return Objects.hash(newComment); + } + + /** + * Provides a string representation of the UpdateTopicComment instance. This string format + * includes the class name followed by the new comment for the topic. + * + * @return A string summary of this comment update operation. + */ + @Override + public String toString() { + return "UPDATETOPICCOMMENT " + newComment; + } + } + + /** A topic change to set or update the property and value for the topic. */ + final class SetProperty implements TopicChange { + private final String property; + private final String value; + + private SetProperty(String property, String value) { + this.property = property; + this.value = value; + } + + /** + * Retrieves the name of the property being set in the topic. + * + * @return The name of the property. + */ + public String getProperty() { + return property; + } + + /** + * Retrieves the value assigned to the property in the topic. + * + * @return The value of the property. + */ + public String getValue() { + return value; + } + + /** + * Compares this SetProperty instance with another object for equality. Two instances are + * considered equal if they have the same property and value for the topic. + * + * @param o The object to compare with this instance. + * @return true if the given object represents the same property setting; false otherwise. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SetProperty that = (SetProperty) o; + return Objects.equals(property, that.property) && Objects.equals(value, that.value); + } + + /** + * Generates a hash code for this SetProperty instance. The hash code is based on both the + * property name and its assigned value. + * + * @return A hash code value for this property setting. + */ + @Override + public int hashCode() { + return Objects.hash(property, value); + } + + /** + * Provides a string representation of the SetProperty instance. This string format includes the + * class name followed by the property and its value. + * + * @return A string summary of the property setting. + */ + @Override + public String toString() { + return "SETPROPERTY " + property + " " + value; + } + } + + /** A topic change to remove a property from the topic. */ + final class RemoveProperty implements TopicChange { + private final String property; + + private RemoveProperty(String property) { + this.property = property; + } + + /** + * Retrieves the name of the property to be removed from the topic. + * + * @return The name of the property for removal. + */ + public String getProperty() { + return property; + } + + /** + * Compares this RemoveProperty instance with another object for equality. Two instances are + * considered equal if they target the same property for removal from the topic. + * + * @param o The object to compare with this instance. + * @return true if the given object represents the same property removal; false otherwise. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RemoveProperty that = (RemoveProperty) o; + return Objects.equals(property, that.property); + } + + /** + * Generates a hash code for this RemoveProperty instance. The hash code is based on the + * property name that is to be removed from the topic. + * + * @return A hash code value for this property removal operation. + */ + @Override + public int hashCode() { + return Objects.hash(property); + } + + /** + * Provides a string representation of the RemoveProperty instance. This string format includes + * the class name followed by the property name to be removed. + * + * @return A string summary of the property removal operation. + */ + @Override + public String toString() { + return "REMOVEPROPERTY " + property; + } + } +} diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java index 30df6657821..fc59a2efa84 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/DTOConverters.java @@ -80,7 +80,7 @@ static Catalog toCatalog(CatalogDTO catalog, RESTClient client) { .withRestClient(client) .build(); - case STREAM: + case MESSAGING: default: throw new UnsupportedOperationException("Unsupported catalog type: " + catalog.type()); } diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java index eddd13dbde7..77153bfd9dd 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java @@ -125,7 +125,7 @@ public void testLoadCatalog() throws JsonProcessingException { new CatalogDTO.Builder() .withName("mock") .withComment("comment") - .withType(Catalog.Type.STREAM) + .withType(Catalog.Type.MESSAGING) .withProvider("test") .withAudit( new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build()) @@ -182,14 +182,14 @@ public void testCreateCatalog() throws JsonProcessingException { new CatalogDTO.Builder() .withName("mock") .withComment("comment") - .withType(Catalog.Type.STREAM) + .withType(Catalog.Type.MESSAGING) .withProvider("test") .withAudit( new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build()) .build(); CatalogCreateRequest req1 = new CatalogCreateRequest( - catalogName, Catalog.Type.STREAM, provider, "comment", Collections.emptyMap()); + catalogName, Catalog.Type.MESSAGING, provider, "comment", Collections.emptyMap()); CatalogResponse resp1 = new CatalogResponse(mockCatalog1); buildMockResource(Method.POST, path, req1, resp1, HttpStatus.SC_OK); NameIdentifier id = NameIdentifier.of(metalakeName, catalogName); @@ -197,7 +197,7 @@ public void testCreateCatalog() throws JsonProcessingException { Assertions.assertThrows( UnsupportedOperationException.class, - () -> metalake.createCatalog(id, Catalog.Type.STREAM, provider, "comment", emptyMap)); + () -> metalake.createCatalog(id, Catalog.Type.MESSAGING, provider, "comment", emptyMap)); // Test return NoSuchMetalakeException ErrorResponse errorResponse = diff --git a/core/src/main/java/com/datastrato/gravitino/Entity.java b/core/src/main/java/com/datastrato/gravitino/Entity.java index 65e3d073a98..116919fe621 100644 --- a/core/src/main/java/com/datastrato/gravitino/Entity.java +++ b/core/src/main/java/com/datastrato/gravitino/Entity.java @@ -20,6 +20,7 @@ enum EntityType { TABLE("ta", 3), COLUMN("co", 4), FILESET("fi", 5), + TOPIC("to", 6), AUDIT("au", 65534); diff --git a/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java b/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java new file mode 100644 index 00000000000..8b0a3180314 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/meta/TopicEntity.java @@ -0,0 +1,231 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.meta; + +import com.datastrato.gravitino.Auditable; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.Field; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.Namespace; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import lombok.ToString; + +/** A class representing a topic metadata entity in Gravitino. */ +@ToString +public class TopicEntity implements Entity, Auditable, HasIdentifier { + public static final Field ID = + Field.required("id", Long.class, "The unique id of the topic entity."); + public static final Field NAME = + Field.required("name", String.class, "The name of the topic entity."); + public static final Field COMMENT = + Field.optional("comment", String.class, "The comment or description of the topic entity."); + public static final Field AUDIT_INFO = + Field.required("audit_info", AuditInfo.class, "The audit details of the topic entity."); + public static final Field PROPERTIES = + Field.optional("properties", Map.class, "The properties of the topic entity."); + + public static Builder builder() { + return new Builder(); + } + + private Long id; + private String name; + private Namespace namespace; + private String comment; + private AuditInfo auditInfo; + private Map properties; + + private TopicEntity() {} + + /** + * Returns a map of fields and their corresponding values for this topic entity. + * + * @return An unmodifiable map of the fields and values. + */ + @Override + public Map fields() { + Map fields = Maps.newHashMap(); + fields.put(ID, id); + fields.put(NAME, name); + fields.put(COMMENT, comment); + fields.put(AUDIT_INFO, auditInfo); + fields.put(PROPERTIES, properties); + + return Collections.unmodifiableMap(fields); + } + + /** + * Returns the name of the topic. + * + * @return The name of the topic. + */ + @Override + public String name() { + return name; + } + + /** + * Returns the namespace of the topic. + * + * @return The namespace of the topic. + */ + @Override + public Namespace namespace() { + return namespace; + } + + /** + * Returns the unique id of the topic. + * + * @return The unique id of the topic. + */ + @Override + public Long id() { + return id; + } + + /** + * Returns the comment or description of the topic. + * + * @return The comment or description of the topic. + */ + public String comment() { + return comment; + } + + /** + * Returns the audit details of the topic. + * + * @return The audit details of the topic. + */ + @Override + public AuditInfo auditInfo() { + return auditInfo; + } + + /** + * Returns the type of the entity. + * + * @return The type of the entity. + */ + @Override + public EntityType type() { + return EntityType.TOPIC; + } + + /** + * Returns the properties of the topic entity. + * + * @return The properties of the topic entity. + */ + public Map properties() { + return properties; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TopicEntity)) return false; + + TopicEntity that = (TopicEntity) o; + return Objects.equals(id, that.id) + && Objects.equals(name, that.name) + && Objects.equals(comment, that.comment) + && Objects.equals(auditInfo, that.auditInfo) + && Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, comment, auditInfo, properties); + } + + public static class Builder { + private final TopicEntity topic; + + private Builder() { + topic = new TopicEntity(); + } + + /** + * Sets the unique id of the topic entity. + * + * @param id The unique id of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withId(Long id) { + topic.id = id; + return this; + } + + /** + * Sets the name of the topic entity. + * + * @param name The name of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withName(String name) { + topic.name = name; + return this; + } + + /** + * Sets the namespace of the topic entity. + * + * @param namespace The namespace of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withNamespace(Namespace namespace) { + topic.namespace = namespace; + return this; + } + + /** + * Sets the comment or description of the topic entity. + * + * @param comment The comment or description of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withComment(String comment) { + topic.comment = comment; + return this; + } + + /** + * Sets the audit details of the topic entity. + * + * @param auditInfo The audit details of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withAuditInfo(AuditInfo auditInfo) { + topic.auditInfo = auditInfo; + return this; + } + + /** + * Sets the properties of the topic entity. + * + * @param properties The properties of the topic entity. + * @return The builder instance. + */ + public TopicEntity.Builder withProperties(Map properties) { + topic.properties = properties; + return this; + } + + /** + * Builds the topic entity. + * + * @return The built topic entity. + */ + public TopicEntity build() { + topic.validate(); + return topic; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java b/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java index 08c00b6b6bf..182716cbfcd 100644 --- a/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java +++ b/core/src/main/java/com/datastrato/gravitino/proto/ProtoEntitySerDe.java @@ -37,6 +37,9 @@ public class ProtoEntitySerDe implements EntitySerDe { .put( "com.datastrato.gravitino.meta.FilesetEntity", "com.datastrato.gravitino.proto.FilesetEntitySerDe") + .put( + "com.datastrato.gravitino.meta.TopicEntity", + "com.datastrato.gravitino.proto.TopicEntitySerDe") .build(); private static final Map ENTITY_TO_PROTO = @@ -52,7 +55,9 @@ public class ProtoEntitySerDe implements EntitySerDe { "com.datastrato.gravitino.meta.TableEntity", "com.datastrato.gravitino.proto.Table", "com.datastrato.gravitino.meta.FilesetEntity", - "com.datastrato.gravitino.proto.Fileset"); + "com.datastrato.gravitino.proto.Fileset", + "com.datastrato.gravitino.meta.TopicEntity", + "com.datastrato.gravitino.proto.Topic"); private final Map, ProtoSerDe> entityToSerDe; diff --git a/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java b/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java new file mode 100644 index 00000000000..c784030dc47 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/proto/TopicEntitySerDe.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.proto; + +import com.datastrato.gravitino.meta.TopicEntity; + +public class TopicEntitySerDe implements ProtoSerDe { + + @Override + public Topic serialize(TopicEntity topicEntity) { + Topic.Builder builder = + Topic.newBuilder() + .setId(topicEntity.id()) + .setName(topicEntity.name()) + .setAuditInfo(new AuditInfoSerDe().serialize(topicEntity.auditInfo())); + + if (topicEntity.comment() != null) { + builder.setComment(topicEntity.comment()); + } + + if (topicEntity.properties() != null && !topicEntity.properties().isEmpty()) { + builder.putAllProperties(topicEntity.properties()); + } + + return builder.build(); + } + + @Override + public TopicEntity deserialize(Topic p) { + TopicEntity.Builder builder = + TopicEntity.builder() + .withId(p.getId()) + .withName(p.getName()) + .withAuditInfo(new AuditInfoSerDe().deserialize(p.getAuditInfo())); + + if (p.hasComment()) { + builder.withComment(p.getComment()); + } + + if (p.getPropertiesCount() > 0) { + builder.withProperties(p.getPropertiesMap()); + } + + return builder.build(); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java index d7d28199a34..7feff2608ea 100644 --- a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java +++ b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java @@ -43,6 +43,10 @@ public class TestEntity { private final Long fileId = 1L; private final String fileName = "testFile"; + // Topic test data + private final Long topicId = 1L; + private final String topicName = "testTopic"; + @Test public void testMetalake() { BaseMetalake metalake = @@ -176,4 +180,28 @@ public void testFile() { }); Assertions.assertEquals("Field storage_location is required", exception.getMessage()); } + + @Test + public void testTopic() { + TopicEntity testTopic = + TopicEntity.builder() + .withId(topicId) + .withName(topicName) + .withAuditInfo(auditInfo) + .withComment("test topic comment") + .withProperties(map) + .build(); + + Map fields = testTopic.fields(); + Assertions.assertEquals(topicId, fields.get(TopicEntity.ID)); + Assertions.assertEquals(topicName, fields.get(TopicEntity.NAME)); + Assertions.assertEquals(auditInfo, fields.get(TopicEntity.AUDIT_INFO)); + Assertions.assertEquals("test topic comment", fields.get(TopicEntity.COMMENT)); + Assertions.assertEquals(map, fields.get(TopicEntity.PROPERTIES)); + + TopicEntity testTopic1 = + TopicEntity.builder().withId(topicId).withName(topicName).withAuditInfo(auditInfo).build(); + Assertions.assertNull(testTopic1.comment()); + Assertions.assertNull(testTopic1.properties()); + } } diff --git a/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java b/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java index f29b95ff7a3..073d7b5a269 100644 --- a/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java +++ b/core/src/test/java/com/datastrato/gravitino/proto/TestEntityProtoSerDe.java @@ -245,5 +245,34 @@ public void testEntitiesSerDe() throws IOException { Assertions.assertEquals("testLocation", fileEntityFromBytes2.storageLocation()); Assertions.assertEquals( com.datastrato.gravitino.file.Fileset.Type.EXTERNAL, fileEntityFromBytes2.filesetType()); + + // Test TopicEntity + Long topicId = 1L; + String topicName = "topic"; + com.datastrato.gravitino.meta.TopicEntity topicEntity = + com.datastrato.gravitino.meta.TopicEntity.builder() + .withId(topicId) + .withName(topicName) + .withAuditInfo(auditInfo) + .withComment(comment) + .withProperties(props) + .build(); + byte[] topicBytes = protoEntitySerDe.serialize(topicEntity); + com.datastrato.gravitino.meta.TopicEntity topicEntityFromBytes = + protoEntitySerDe.deserialize(topicBytes, com.datastrato.gravitino.meta.TopicEntity.class); + Assertions.assertEquals(topicEntity, topicEntityFromBytes); + + com.datastrato.gravitino.meta.TopicEntity topicEntity1 = + com.datastrato.gravitino.meta.TopicEntity.builder() + .withId(topicId) + .withName(topicName) + .withAuditInfo(auditInfo) + .build(); + byte[] topicBytes1 = protoEntitySerDe.serialize(topicEntity1); + com.datastrato.gravitino.meta.TopicEntity topicEntityFromBytes1 = + protoEntitySerDe.deserialize(topicBytes1, com.datastrato.gravitino.meta.TopicEntity.class); + Assertions.assertEquals(topicEntity1, topicEntityFromBytes1); + Assertions.assertNull(topicEntityFromBytes1.comment()); + Assertions.assertNull(topicEntityFromBytes1.properties()); } } diff --git a/meta/src/main/proto/gravitino_meta.proto b/meta/src/main/proto/gravitino_meta.proto index b5f24738528..2f7d6f9e15e 100644 --- a/meta/src/main/proto/gravitino_meta.proto +++ b/meta/src/main/proto/gravitino_meta.proto @@ -93,3 +93,11 @@ message Fileset { map properties = 6; AuditInfo audit_info = 7; } + +message Topic { + uint64 id = 1; + string name = 2; + optional string comment = 3; + map properties = 4; + AuditInfo audit_info = 5; +}