diff --git a/docs/documentation/advanced/postgresql.md b/docs/documentation/advanced/postgresql.md index 22312ee48..3a1d2ff3b 100644 --- a/docs/documentation/advanced/postgresql.md +++ b/docs/documentation/advanced/postgresql.md @@ -1,7 +1,7 @@ # PostgreSQL By default conductor runs with an in-memory Redis mock. However, you -can run Conductor against PostgreSQL which provides workflow management, queues and indexing. +can run Conductor against PostgreSQL which provides workflow management, queues, indexing, and locking. There are a number of configuration options that enable you to use more or less of PostgreSQL functionality for your needs. It has the benefit of requiring fewer moving parts for the infrastructure, but does not scale as well to handle high volumes of workflows. You should benchmark Conductor with Postgres against your specific workload to be sure. @@ -34,6 +34,12 @@ conductor.indexing.type=postgres conductor.elasticsearch.version=0 ``` +To use PostgreSQL for locking, set the following configurations: +```properties +conductor.app.workflowExecutionLockEnabled=true +conductor.workflow-execution-lock.type=postgres +``` + ## Performance Optimisations ### Poll Data caching diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java index 715beefb0..1e00cb067 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java @@ -118,6 +118,17 @@ public PostgresIndexDAO postgresIndexDAO( return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource, properties); } + @Bean + @DependsOn({"flywayForPrimaryDb"}) + @ConditionalOnProperty( + name = "conductor.workflow-execution-lock.type", + havingValue = "postgres") + public PostgresLockDAO postgresLockDAO( + @Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate, + ObjectMapper objectMapper) { + return new PostgresLockDAO(retryTemplate, objectMapper, dataSource); + } + @Bean public RetryTemplate postgresRetryTemplate(PostgresProperties properties) { SimpleRetryPolicy retryPolicy = new CustomRetryPolicy(); diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresLockDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresLockDAO.java new file mode 100644 index 000000000..072ec1524 --- /dev/null +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresLockDAO.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.dao; + +import java.util.concurrent.TimeUnit; + +import javax.sql.DataSource; + +import org.springframework.retry.support.RetryTemplate; + +import com.netflix.conductor.core.sync.Lock; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class PostgresLockDAO extends PostgresBaseDAO implements Lock { + private final long DAY_MS = 24 * 60 * 60 * 1000; + + public PostgresLockDAO( + RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) { + super(retryTemplate, objectMapper, dataSource); + } + + @Override + public void acquireLock(String lockId) { + acquireLock(lockId, DAY_MS, DAY_MS, TimeUnit.MILLISECONDS); + } + + @Override + public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) { + return acquireLock(lockId, timeToTry, DAY_MS, unit); + } + + @Override + public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit) { + long endTime = System.currentTimeMillis() + unit.toMillis(timeToTry); + while (System.currentTimeMillis() < endTime) { + var sql = + "INSERT INTO locks(lock_id, lease_expiration) VALUES (?, now() + (?::text || ' milliseconds')::interval) ON CONFLICT (lock_id) DO UPDATE SET lease_expiration = EXCLUDED.lease_expiration WHERE locks.lease_expiration <= now()"; + + int rowsAffected = + queryWithTransaction( + sql, + q -> + q.addParameter(lockId) + .addParameter(unit.toMillis(leaseTime)) + .executeUpdate()); + + if (rowsAffected > 0) { + return true; + } + + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + } + return false; + } + + @Override + public void releaseLock(String lockId) { + var sql = "DELETE FROM locks WHERE lock_id = ?"; + queryWithTransaction(sql, q -> q.addParameter(lockId).executeDelete()); + } + + @Override + public void deleteLock(String lockId) { + releaseLock(lockId); + } +} diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V11__locking.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V11__locking.sql new file mode 100644 index 000000000..f2062d9c3 --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V11__locking.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS locks ( + lock_id VARCHAR PRIMARY KEY, + lease_expiration TIMESTAMP WITH TIME ZONE NOT NULL +); diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresLockDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresLockDAOTest.java new file mode 100644 index 000000000..695f15f10 --- /dev/null +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresLockDAOTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.dao; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.*; + +import javax.sql.DataSource; + +import org.flywaydb.core.Flyway; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import com.netflix.conductor.common.config.TestObjectMapperConfiguration; +import com.netflix.conductor.postgres.config.PostgresConfiguration; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@RunWith(SpringRunner.class) +@ContextConfiguration( + classes = { + TestObjectMapperConfiguration.class, + PostgresConfiguration.class, + FlywayAutoConfiguration.class + }) +@TestPropertySource( + properties = { + "conductor.workflow-execution-lock.type=postgres", + "spring.flyway.clean-disabled=false" + }) +@SpringBootTest +public class PostgresLockDAOTest { + + @Autowired private PostgresLockDAO postgresLock; + + @Autowired private DataSource dataSource; + + @Autowired private Flyway flyway; + + @Before + public void before() { + flyway.migrate(); // Clean and migrate the database before each test. + } + + @Test + public void testLockAcquisitionAndRelease() throws SQLException { + String lockId = UUID.randomUUID().toString(); + Instant beforeAcquisitionTimeUtc = Instant.now(); + long leaseTime = 2000; + + try (var connection = dataSource.getConnection()) { + assertTrue( + postgresLock.acquireLock(lockId, 500, leaseTime, TimeUnit.MILLISECONDS), + "Lock acquisition failed"); + Instant afterAcquisitionTimeUtc = Instant.now(); + + try (var ps = connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) { + ps.setString(1, lockId); + var rs = ps.executeQuery(); + + if (rs.next()) { + assertEquals(lockId, rs.getString("lock_id")); + long leaseExpirationTime = rs.getTimestamp("lease_expiration").getTime(); + assertTrue( + leaseExpirationTime + >= beforeAcquisitionTimeUtc + .plusMillis(leaseTime) + .toEpochMilli(), + "Lease expiration is too early"); + assertTrue( + leaseExpirationTime + <= afterAcquisitionTimeUtc.plusMillis(leaseTime).toEpochMilli(), + "Lease expiration is too late"); + } else { + Assertions.fail("Lock not found in the database"); + } + } + + postgresLock.releaseLock(lockId); + + try (PreparedStatement ps = + connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) { + ps.setString(1, lockId); + var rs = ps.executeQuery(); + Assertions.assertFalse(rs.next(), "Lock was not released properly"); + } + } + } + + @Test + public void testExpiredLockCanBeAcquiredAgain() throws InterruptedException { + String lockId = UUID.randomUUID().toString(); + assertTrue( + postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS), + "First lock acquisition failed"); + + Thread.sleep(1000); // Ensure the lock has expired. + + assertTrue( + postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS), + "Lock acquisition after expiration failed"); + + postgresLock.releaseLock(lockId); + } + + @Test + public void testConcurrentLockAcquisition() throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + String lockId = UUID.randomUUID().toString(); + + Future future1 = + executorService.submit( + () -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS)); + Future future2 = + executorService.submit( + () -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS)); + + assertTrue( + future1.get() + ^ future2.get()); // One of the futures should hold the lock, the other + // should get rejected + + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + + postgresLock.releaseLock(lockId); + } + + @Test + public void testDifferentLockCanBeAcquiredConcurrently() { + String lockId1 = UUID.randomUUID().toString(); + String lockId2 = UUID.randomUUID().toString(); + + assertTrue(postgresLock.acquireLock(lockId1, 2000, 10000, TimeUnit.MILLISECONDS)); + assertTrue(postgresLock.acquireLock(lockId2, 2000, 10000, TimeUnit.MILLISECONDS)); + } +}