Skip to content

Commit

Permalink
Add readme + some more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
vdsirotkin committed Jun 3, 2024
1 parent 245e7e6 commit 05eb970
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 16 deletions.
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# pgmq-kotlin-jvm

[![codecov](https://codecov.io/gh/vdsirotkin/pgmq-kotlin-jvm/graph/badge.svg?token=GAXNOBXHKX)](https://codecov.io/gh/vdsirotkin/pgmq-kotlin-jvm)

A Kotlin client, based on pure JDBC, for [Postgres Message Queue](https://github.com/tembo-io/pgmq) (PGMQ).

# Compatibility

This library currently supports following PGMQ API:
* `create`
* `drop_queue`
* `list_queues`
* `send_batch`
* `read`
* `pop`
* `archive`
* `delete`
* `purge_queue`

# Usage

First of all - you need to set up `PgmqConnectionFactory` and `PgmqSerializationProvider`

### PgmqConnectionFactory

You are free to implement it either way you wish :)

Example implementation for Spring, which acquires current connection inside of `@Transactional` method (may be useful if you need PGMQ to be transational):

```kotlin
@Bean
fun pgmqConnectionFactory(dataSource: DataSource) = PgmqConnectionFactory {
DataSourceUtils.getConnection(dataSource)
}
```

Please refer to Spring's [DataSourceUtils](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jdbc/datasource/DataSourceUtils.html#getConnection(javax.sql.DataSource)) docs for explanations

### PgmqSerializationProvider

This library provides 2 implementations - [GsonPgmqSerializationProvider](src%2Fmain%2Fkotlin%2Fcom%2Fvdsirotkin%2Fpgmq%2Fserialization%2FGsonPgmqSerializationProvider.kt) and [JacksonPgmqSerializationProvider](src%2Fmain%2Fkotlin%2Fcom%2Fvdsirotkin%2Fpgmq%2Fserialization%2FJacksonPgmqSerializationProvider.kt)

If you use some other serialization library - feel free to implement [PgmqSerializationProvider](src%2Fmain%2Fkotlin%2Fcom%2Fvdsirotkin%2Fpgmq%2Fserialization%2FPgmqSerializationProvider.kt) interface, it's simple! :)

### PgmqConfiguration

Next step should be defining [PgmqConfiguration](src%2Fmain%2Fkotlin%2Fcom%2Fvdsirotkin%2Fpgmq%2Fconfig%2FPgmqConfiguration.kt). You can also implement it either way you want.

Here's an example for Spring's Configuration Properties:

```kotlin
@ConfigurationProperties(prefix = "pgmq")
data class PgmqConfigurationProps(
override val defaultVisibilityTimeout: java.time.Duration = 30.seconds.toJavaDuration()
) : PgmqConfiguration
```

### Building client

If you have set up connection factory and serializer - you're free to build the client!

Here's complete example of Spring setup for PgmqClient - [SpringPgmqApplication.kt](src%2Ftest%2Fkotlin%2Fcom%2Fvdsirotkin%2Fpgmq%2Fspring%2FSpringPgmqApplication.kt).

# Contributions

Contributions are always welcomed!
30 changes: 24 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
<version>2.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit5</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
Expand All @@ -69,6 +63,30 @@
<version>2.0.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.2.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<version>3.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
Expand Down
95 changes: 91 additions & 4 deletions src/main/kotlin/com/vdsirotkin/pgmq/PgmqClient.kt
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package com.vdsirotkin.pgmq

import com.vdsirotkin.pgmq.config.ConnectionFactory
import com.vdsirotkin.pgmq.config.PgmqConnectionFactory
import com.vdsirotkin.pgmq.config.PgmqConfiguration
import com.vdsirotkin.pgmq.domain.PgmqEntry
import com.vdsirotkin.pgmq.domain.PgmqQueue
import com.vdsirotkin.pgmq.serialization.PgmqSerializationProvider
import com.vdsirotkin.pgmq.util.ResultSetIterator
import com.vdsirotkin.pgmq.util.asIterable
import java.sql.ResultSet
import java.time.OffsetDateTime
import kotlin.time.Duration
import kotlin.time.toKotlinDuration

/**
* PgmqClient is a class that provides an interface to interact with a PostgreSQL message queue.
*
* @property connectionFactory The factory to create database connections.
* @property serializationProvider The provider to serialize and deserialize messages.
* @property configuration The configuration for the PgmqClient.
*/
class PgmqClient(
private val connectionFactory: ConnectionFactory,
private val connectionFactory: PgmqConnectionFactory,
private val serializationProvider: PgmqSerializationProvider,
private val configuration: PgmqConfiguration,
) {

/**
* Creates a new queue with the given name.
*
* @param queueName The name of the queue to be created.
*/
fun createQueue(queueName: String) {
connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT pgmq.create(?)").use {
Expand All @@ -25,6 +38,11 @@ class PgmqClient(
}
}

/**
* Drops a queue with the given name.
*
* @param queueName The name of the queue to be dropped.
*/
fun dropQueue(queueName: String) {
connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT pgmq.drop_queue(?)").use {
Expand All @@ -34,6 +52,11 @@ class PgmqClient(
}
}

/**
* Lists all the queues.
*
* @return A list of all queues.
*/
fun listQueues(): List<PgmqQueue> {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT queue_name, created_at FROM pgmq.list_queues()").use {
Expand All @@ -49,10 +72,26 @@ class PgmqClient(
}
}

/**
* Sends a message to the specified queue.
*
* @param queueName The name of the queue.
* @param message The message to be sent.
* @param delay The delay before the message becomes visible in the queue.
* @return The id of the sent message.
*/
fun send(queueName: String, message: Any, delay: Duration = Duration.ZERO): Long {
return sendBatch(queueName, listOf(message), delay).firstOrNull() ?: throw PgmqException("No message id provided for sent message. Queue: $queueName, message: $message")
}

/**
* Sends a batch of messages to the specified queue.
*
* @param queueName The name of the queue.
* @param messages The messages to be sent.
* @param delay The delay before the messages become visible in the queue.
* @return A list of ids of the sent messages.
*/
fun <T : Any> sendBatch(queueName: String, messages: Collection<T>, delay: Duration = Duration.ZERO): List<Long> {
val jsons = messages.map { serializationProvider.serialize(it) }.toTypedArray()
return connectionFactory.createConnection().use { connection ->
Expand All @@ -67,7 +106,15 @@ class PgmqClient(
}
}

fun readBatch(queueName: String, quantity: Int, visibilityTimeout: Duration = configuration.defaultVisibilityTimeout): List<PgmqEntry> {
/**
* Reads a batch of messages from the specified queue.
*
* @param queueName The name of the queue.
* @param quantity The number of messages to read.
* @param visibilityTimeout The time period during which the message will be invisible to other consumers.
* @return A list of messages read from the queue.
*/
fun readBatch(queueName: String, quantity: Int, visibilityTimeout: Duration = configuration.defaultVisibilityTimeout.toKotlinDuration()): List<PgmqEntry> {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT msg_id, read_ct, enqueued_at, vt, message FROM pgmq.read(?, ?, ?)").use {
it.setString(1, queueName)
Expand All @@ -80,6 +127,12 @@ class PgmqClient(
}
}

/**
* Pops a message from the specified queue.
*
* @param queueName The name of the queue.
* @return The message popped from the queue.
*/
fun pop(queueName: String): PgmqEntry? {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT msg_id, read_ct, enqueued_at, vt, message FROM pgmq.pop(?)").use {
Expand All @@ -91,6 +144,13 @@ class PgmqClient(
}
}

/**
* Archives a list of messages from the specified queue.
*
* @param queueName The name of the queue.
* @param messageIds The ids of the messages to be archived.
* @return A list of ids of the archived messages.
*/
fun archive(queueName: String, messageIds: Collection<Long>): List<Long> {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT archive FROM pgmq.archive(?, ?)").use {
Expand All @@ -103,8 +163,22 @@ class PgmqClient(
}
}

/**
* Archives a message from the specified queue.
*
* @param queueName The name of the queue.
* @param messageId The id of the message to be archived.
* @return True if the message was archived, false otherwise.
*/
fun archive(queueName: String, messageId: Long): Boolean = archive(queueName, listOf(messageId)).size == 1

/**
* Deletes a list of messages from the specified queue.
*
* @param queueName The name of the queue.
* @param messageIds The ids of the messages to be deleted.
* @return A list of ids of the deleted messages.
*/
fun delete(queueName: String, messageIds: Collection<Long>): List<Long> {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT delete FROM pgmq.delete(?, ?)").use {
Expand All @@ -117,8 +191,21 @@ class PgmqClient(
}
}

/**
* Deletes a message from the specified queue.
*
* @param queueName The name of the queue.
* @param messageId The id of the message to be deleted.
* @return True if the message was deleted, false otherwise.
*/
fun delete(queueName: String, messageId: Long): Boolean = delete(queueName, listOf(messageId)).size == 1

/**
* Purges all messages from the specified queue.
*
* @param queueName The name of the queue.
* @return The number of messages purged from the queue.
*/
fun purge(queueName: String): Int {
return connectionFactory.createConnection().use { connection ->
connection.prepareStatement("SELECT purge_queue from pgmq.purge_queue(?)").use {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.vdsirotkin.pgmq.config

import kotlin.time.Duration

interface PgmqConfiguration {
val defaultVisibilityTimeout: Duration
val defaultVisibilityTimeout: java.time.Duration
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package com.vdsirotkin.pgmq.config

import java.sql.Connection

fun interface ConnectionFactory {
fun interface PgmqConnectionFactory {
fun createConnection(): Connection
}
7 changes: 4 additions & 3 deletions src/test/kotlin/com/vdsirotkin/pgmq/Utils.kt
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.vdsirotkin.pgmq

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.vdsirotkin.pgmq.config.ConnectionFactory
import com.vdsirotkin.pgmq.config.PgmqConnectionFactory
import com.vdsirotkin.pgmq.config.PgmqConfiguration
import org.slf4j.LoggerFactory
import java.sql.Connection
import java.sql.DriverManager
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

object TestConfiguration : PgmqConfiguration {
override val defaultVisibilityTimeout: Duration = 10.seconds
override val defaultVisibilityTimeout: java.time.Duration = 10.seconds.toJavaDuration()
}

val objectMapper = jacksonObjectMapper().findAndRegisterModules()
Expand All @@ -34,7 +35,7 @@ class TestConnectionFactory(
private val jdbcUrl: String,
private val username: String,
private val password: String,
) : ConnectionFactory {
) : PgmqConnectionFactory {
lateinit var connection: TestConnection

fun prepareConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.vdsirotkin.pgmq.spring

import com.fasterxml.jackson.databind.ObjectMapper
import com.vdsirotkin.pgmq.PgmqClient
import com.vdsirotkin.pgmq.config.PgmqConfiguration
import com.vdsirotkin.pgmq.config.PgmqConnectionFactory
import com.vdsirotkin.pgmq.objectMapper
import com.vdsirotkin.pgmq.serialization.JacksonPgmqSerializationProvider
import com.vdsirotkin.pgmq.serialization.PgmqSerializationProvider
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jdbc.datasource.DataSourceUtils
import javax.sql.DataSource
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

@SpringBootApplication
@EnableConfigurationProperties(PgmqConfigurationProps::class)
open class SpringPgmqApplication {
@Configuration(proxyBeanMethods = false)
open class PgmqConfig {
@Bean
open fun objectMapper() = objectMapper

@Bean
open fun pgmqConnectionFactory(dataSource: DataSource) = PgmqConnectionFactory {
DataSourceUtils.getConnection(dataSource)
}

@Bean
open fun pgmqSerializer(objectMapper: ObjectMapper) = JacksonPgmqSerializationProvider(objectMapper)

@Bean
open fun pgmqClient(
pgmqConnectionFactory: PgmqConnectionFactory,
pgmqSerializationProvider: PgmqSerializationProvider,
pgmqConfiguration: PgmqConfiguration
) = PgmqClient(pgmqConnectionFactory, pgmqSerializationProvider, pgmqConfiguration)
}
}

@ConfigurationProperties(prefix = "pgmq")
data class PgmqConfigurationProps(
override val defaultVisibilityTimeout: java.time.Duration = 30.seconds.toJavaDuration()
) : PgmqConfiguration
Loading

0 comments on commit 05eb970

Please sign in to comment.