This library can be used to implement applications that need to process messages from redis queues.
The project is build using maven with JDK 11. Run mvn install
to have the artefact available in your local repostiory.
A WorkSubscription
describes a group of interested message handlers that want to process items from a redis queue.
The subscription is activated using the SubscriptionManager
. It runs background threads that poll for tasks and process them using workers.
RStreamer is more efficient than regular polling using jedis because of the following:
- Several tasks can be fetched in one batch which is far more network efficient and allows workers to process this batch in one run.
- Each
WorkSubscription
has one thread dedicated to fetching tasks from redis. - Workers process tasks asynchronously on a dedicated thread pool for each subscription.
- Tasks can be processed concurrently. The concurrency level is controlled using
Leases
which effectively back-pressure the polling thread, making it wait until capacity is available before fetching more tasks from the queue.
All subscriptions must be added to the SubscriptionManager
prior to activation.
The manager depends on a RedisBatchPoller
, the application driver for polling tasks from redis.
Its current implementation is JedisBatchPoller
, which uses the jedis driver to communicate with redis.
To ensure graceful shutdown, call deactivateSubscriptions()
before exiting your application to stop polling from redis and finish processing outstanding tasks.
A RedisBatchPoller
is configured using a BatchPollerConfig
.
BatchPollerConfig batchPollerConfig = BatchPollerConfigBuilder.defaultBatchPollerConfig()
.withHost("myhost")
.withPort(6379)
.build();
The default config assumes you are connecting to the default redis port on localhost.
A ReliableBatchPoller
is configured using a ReliableBatchPollerConfig
.
ReliableBatchPollerConfig reliableBatchPollerConfig = defaultReliableBatchPollerConfig()
.withRetryAttempts(3)
.withFailureRateThreshold(70)
.build();
A sample of the operations are evaluated. If the sample's failure rate is above the set threshold, the circuit breaker will open for a while and stop the caller from making requests to an unresponsive server.
SubscriptionManager
is configured using SubscriptionManagerConfig
.
This config holds a LeaseConfig
and a PollingConfig
.
SubscriptionManagerConfig config = SubscriptionManageConfigBuilder.defaultSubscriptionManagerConfig()
.with(
LeaseConfigBuilder.defaultLeaseConfig()
.withAvailableLeases(5)
)
.with(
PollingConfigBuilder.defaultPollingConfig()
.withBatchSize(50)
.withBatchSizeThreshold(20)
)
.build();
The number of available leases controls the concurrency level for each subscription. The larger the number, the more tasks (or group of tasks) can be processed in parallel before blocking the polling thread.
It is more efficient to single poll than to batch poll a queue with very few items. Single polling uses redis's blocking operation and hence can wait on the server side until an item is inserted. On the other hand, batch polling will continuously try to fetch a list of items. Therefore, a batchSizeThreshold
parameter is used to specify the minimum number of items that must be fetched in a batch in order to continue batch polling in the next round.
public class SampleWorker implements Worker {
@Override
public void processSingleItem(String item) {
System.out.println("Got item: " + item);
}
}
public class SampleBatchWorker implements BatchWorker {
@Override
public void processMultipleItems(List<String> items) {
System.out.println("Got " + items.size() + " items");
}
@Override
public void processSingleItem(String item) {
System.out.println("Got item: " + item);
}
}
List<Worker> workers = new ArrayList();
workers.add(new SampleWorker());
workers.add(new SampleBatchWorker());
WorkSubscription subscription = new WorkSubscription("my:task:queue", workers);
BatchPollerConfig batchPollerConfig = defaultBatchPollerConfig()
.withHost("java-0")
.build();
RedisBatchPoller batchPoller = new JedisBatchPoller(batchPollerConfig, subscriptionCount);
ReliableBatchPollerConfig reliableBatchPollerConfig = defaultReliableBatchPollerConfig()
.withRetryAttempts(3)
.withFailureRateThreshold(70)
.build();
RedisBatchPoller reliableBatchPoller = new Resilience4jReliableBatchPoller(batchPoller, reliableBatchPollerConfig, subscriptionCount);
SubscriptionManager subscriptionManager = new SubscriptionManager(createSubscriptionManagerConfig(), createRedisPoller());
subscriptionManager.addSubscription(subscription);
subscriptionManager.activateSubscriptions();
Runtime.getRuntime().addShutdownHook(new Thread(subscriptionManager::deactivateSubscriptions));