Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres locking DAO #102

Merged
merged 11 commits into from
Apr 9, 2024
8 changes: 7 additions & 1 deletion docs/documentation/advanced/postgresql.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# PostgreSQL

By default conductor runs with an in-memory Redis mock. However, you
can run Conductor against PostgreSQL which provides workflow management, queues and indexing.
can run Conductor against PostgreSQL which provides workflow management, queues, indexing, and locking.
There are a number of configuration options that enable you to use more or less of PostgreSQL functionality for your needs.
It has the benefit of requiring fewer moving parts for the infrastructure, but does not scale as well to handle high volumes of workflows.
You should benchmark Conductor with Postgres against your specific workload to be sure.
Expand Down Expand Up @@ -34,6 +34,12 @@ conductor.indexing.type=postgres
conductor.elasticsearch.version=0
```

To use PostgreSQL for locking, set the following configurations:
```properties
conductor.app.workflowExecutionLockEnabled=true
conductor.workflow-execution-lock.type=postgres
```

## Performance Optimisations

### Poll Data caching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ public PostgresIndexDAO postgresIndexDAO(
return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource, properties);
}

@Bean
@DependsOn({"flywayForPrimaryDb"})
@ConditionalOnProperty(
name = "conductor.workflow-execution-lock.type",
havingValue = "postgres")
public PostgresLockDAO postgresLockDAO(
@Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper) {
return new PostgresLockDAO(retryTemplate, objectMapper, dataSource);
}

@Bean
public RetryTemplate postgresRetryTemplate(PostgresProperties properties) {
SimpleRetryPolicy retryPolicy = new CustomRetryPolicy();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.dao;

import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.core.sync.Lock;

import com.fasterxml.jackson.databind.ObjectMapper;

public class PostgresLockDAO extends PostgresBaseDAO implements Lock {

public PostgresLockDAO(
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
super(retryTemplate, objectMapper, dataSource);
}

@Override
public void acquireLock(String lockId) {
acquireLock(lockId, Long.MAX_VALUE, Long.MAX_VALUE, TimeUnit.SECONDS);
}

@Override
public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) {
return acquireLock(lockId, timeToTry, Long.MAX_VALUE, unit);
}

@Override
public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit) {
long endTime = System.currentTimeMillis() + unit.toMillis(timeToTry);
while (System.currentTimeMillis() < endTime) {
var sql =
"INSERT INTO locks(lock_id, lease_expiration) VALUES (?, now() + (?::text || ' milliseconds')::interval) ON CONFLICT (lock_id) DO UPDATE SET lease_expiration = EXCLUDED.lease_expiration WHERE locks.lease_expiration <= now()";

int rowsAffected =
queryWithTransaction(
sql,
q ->
q.addParameter(lockId)
.addParameter(unit.toMillis(leaseTime))
.executeUpdate());

if (rowsAffected > 0) {
return true;
}

try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}

@Override
public void releaseLock(String lockId) {
var sql = "DELETE FROM locks WHERE lock_id = ?";
queryWithTransaction(sql, q -> q.addParameter(lockId).executeDelete());
}

@Override
public void deleteLock(String lockId) {
releaseLock(lockId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS locks (
lock_id VARCHAR PRIMARY KEY,
lease_expiration TIMESTAMP WITH TIME ZONE NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.dao;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.postgres.config.PostgresConfiguration;

import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@RunWith(SpringRunner.class)
@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@TestPropertySource(
properties = {
"conductor.workflow-execution-lock.type=postgres",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresLockDAOTest {

@Autowired private PostgresLockDAO postgresLock;

@Autowired private DataSource dataSource;

@Autowired private Flyway flyway;

@Before
public void before() {
flyway.migrate(); // Clean and migrate the database before each test.
}

@Test
public void testLockAcquisitionAndRelease() throws SQLException {
String lockId = UUID.randomUUID().toString();
Instant beforeAcquisitionTimeUtc = Instant.now();

try (var connection = dataSource.getConnection()) {
assertTrue(
postgresLock.acquireLock(lockId, 500, 2000, TimeUnit.MILLISECONDS),
"Lock acquisition failed");
Instant afterAcquisitionTimeUtc = Instant.now();

try (var ps = connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) {
ps.setString(1, lockId);
var rs = ps.executeQuery();

if (rs.next()) {
assertEquals(lockId, rs.getString("lock_id"));
long leaseExpirationTime = rs.getTimestamp("lease_expiration").getTime();
assertTrue(
leaseExpirationTime
> beforeAcquisitionTimeUtc.plusMillis(2000).toEpochMilli(),
"Lease expiration is too early");
assertTrue(
leaseExpirationTime
< afterAcquisitionTimeUtc.plusMillis(2000).toEpochMilli(),
"Lease expiration is too late");
} else {
Assertions.fail("Lock not found in the database");
}
}

postgresLock.releaseLock(lockId);

try (PreparedStatement ps =
connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) {
ps.setString(1, lockId);
var rs = ps.executeQuery();
Assertions.assertFalse(rs.first(), "Lock was not released properly");
}
}
}

@Test
public void testExpiredLockCanBeAcquiredAgain() throws InterruptedException {
String lockId = UUID.randomUUID().toString();
assertTrue(
postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS),
"First lock acquisition failed");

Thread.sleep(1000); // Ensure the lock has expired.

assertTrue(
postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS),
"Lock acquisition after expiration failed");

postgresLock.releaseLock(lockId);
}

@Test
public void testConcurrentLockAcquisition() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
String lockId = UUID.randomUUID().toString();

Future<Boolean> future1 =
executorService.submit(
() -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS));
Future<Boolean> future2 =
executorService.submit(
() -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS));

assertTrue(
future1.get()
^ future2.get()); // One of the futures should hold the lock, the other
// should get rejected

executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);

postgresLock.releaseLock(lockId);
}

@Test
public void testDifferentLockCanBeAcquiredConcurrently() {
String lockId1 = UUID.randomUUID().toString();
String lockId2 = UUID.randomUUID().toString();

assertTrue(postgresLock.acquireLock(lockId1, 2000, 10000, TimeUnit.MILLISECONDS));
assertTrue(postgresLock.acquireLock(lockId2, 2000, 10000, TimeUnit.MILLISECONDS));
}
}
Loading