Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2371] feat(API, meta): Add basic API and metadata module for messaging catalog #2444

Merged
merged 7 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/com/datastrato/gravitino/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum Type {
FILESET,

/** Catalog Type for Message Queue, like kafka://topic */
STREAM
MESSAGING
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;

/** Exception thrown when a topic with specified name is not existed. */
public class NoSuchTopicException extends NotFoundException {

/**
* Constructs a new exception with the specified detail message.
*
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public NoSuchTopicException(@FormatString String message, Object... args) {
super(message, args);
}

/**
* Constructs a new exception with the specified detail message and cause.
*
* @param cause the cause.
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public NoSuchTopicException(Throwable cause, String message, Object... args) {
super(cause, message, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;

/** Exception thrown when a topic with specified name already exists. */
public class TopicAlreadyExistsException extends AlreadyExistsException {

/**
* Constructs a new exception with the specified detail message.
*
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public TopicAlreadyExistsException(@FormatString String message, Object... args) {
super(message, args);
}

/**
* Constructs a new exception with the specified detail message and cause.
*
* @param cause the cause.
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public TopicAlreadyExistsException(
Throwable cause, @FormatString String message, Object... args) {
super(cause, message, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.messaging;

import com.datastrato.gravitino.annotation.Unstable;

/**
* The interface for message schema of a topic. Currently not implemented, only reserved as a
* placeholder interface.
*/
@Unstable
public interface DataLayout {}
37 changes: 37 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/messaging/Topic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.messaging;

import com.datastrato.gravitino.Auditable;
import com.datastrato.gravitino.annotation.Evolving;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;

/**
* An interface representing a topic under a schema {@link com.datastrato.gravitino.Namespace}. A
* topic is a message queue that is managed by Gravitino. Users can create/drop/alter a topic on the
* Message Queue system like Kafka, Pulsar, etc.
*
* <p>{@link Topic} defines the basic properties of a topic object. A catalog implementation with
* {@link TopicCatalog} should implement this interface.
*/
@Evolving
public interface Topic extends Auditable {

/** @return Name of the topic */
String name();

/** @return The comment of the topic object. Null is returned if no comment is set. */
@Nullable
default String comment() {
return null;
}

/** @return The properties of the topic object. Empty map is returned if no properties are set. */
default Map<String, String> properties() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.messaging;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.annotation.Evolving;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import java.util.Map;

/**
* The {@link TopicCatalog} interface defines the public API for managing topic objects in a schema.
* If the catalog implementation supports topic objects, it should implement this interface.
*/
@Evolving
public interface TopicCatalog {

/**
* List the topics in a schema namespace from the catalog.
*
* @param namespace A schema namespace.
* @return An array of topic identifiers in the namespace.
* @throws NoSuchSchemaException If the schema does not exist.
*/
NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException;

/**
* Load topic metadata by {@link NameIdentifier} from the catalog.
*
* @param ident A topic identifier.
* @return The topic metadata.
* @throws NoSuchTopicException If the topic does not exist.
*/
Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException;

/**
* Check if a topic exists using an {@link NameIdentifier} from the catalog.
*
* @param ident A topic identifier.
* @return true If the topic exists, false otherwise.
*/
default boolean topicExists(NameIdentifier ident) {
try {
loadTopic(ident);
return true;
} catch (NoSuchTopicException e) {
return false;
}
}

/**
* Create a topic in the catalog.
*
* @param ident A topic identifier.
* @param comment The comment of the topic object. Null is set if no comment is specified.
* @param dataLayout The message schema of the topic object. Always null because it's not
* supported yet.
* @param properties The properties of the topic object. Empty map is set if no properties are
* specified.
* @return The topic metadata.
* @throws NoSuchSchemaException If the schema does not exist.
* @throws TopicAlreadyExistsException If the topic already exists.
*/
Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException;

/**
* Apply the {@link TopicChange changes} to a topic in the catalog.
*
* @param ident A topic identifier.
* @param changes The changes to apply to the topic.
* @return The altered topic metadata.
* @throws NoSuchTopicException If the topic does not exist.
* @throws IllegalArgumentException If the changes is rejected by the implementation.
*/
Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException;

/**
* Drop a topic from the catalog.
*
* @param ident A topic identifier.
* @return true If the topic is dropped, false otherwise.
* @throws NoSuchTopicException If the topic does not exist.
*/
boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException;
}
Loading
Loading