Skip to content

Commit

Permalink
Merge pull request #430 from kmgowda/kmg-cqueue-1
Browse files Browse the repository at this point in the history
Add concurrentQ performance benchmarking

Signed-off-by: Keshava Munegowda <[email protected]>
  • Loading branch information
kmgowda authored May 25, 2024
2 parents 8358ff3 + fcb355d commit 3af9d4f
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 15 deletions.
3 changes: 3 additions & 0 deletions build-drivers.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

dependencies {
api project(":drivers:file")
api project(":drivers:concurrentq")
api project(":drivers:filestream")
api project(":drivers:asyncfile")
api project(":drivers:hdfs")
Expand Down Expand Up @@ -45,6 +46,8 @@ dependencies {
api project(':drivers:memcached')
api project(':drivers:dynamodb')
api project(':drivers:exasol')
api project(':drivers:conqueue')
api project(':drivers:linkedbq')
/* api project(':drivers:sbktemplate') */
/* above line is a signature */
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
*/
package io.sbk.driver.ConcurrentQ;

import io.perl.api.Queue;
import io.sbk.api.DataReader;
import io.sbk.api.DataWriter;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Storage;
import io.sbk.params.InputOptions;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Class for Concurrent Queue Benchmarking.
*/
public class ConcurrentQ implements Storage<byte[]> {
private ConcurrentLinkedQueue<byte[]> queue;
protected Queue<byte[]> queue;

@Override
public void addArgs(final InputOptions params) throws IllegalArgumentException {
Expand All @@ -37,7 +37,7 @@ public void parseArgs(final ParameterOptions params) throws IllegalArgumentExcep

@Override
public void openStorage(final ParameterOptions params) throws IOException {
this.queue = new ConcurrentLinkedQueue<>();
this.queue = new LinkedCQueue<>();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
*/
package io.sbk.driver.ConcurrentQ;

import io.perl.api.Queue;
import io.sbk.api.Reader;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;

/**
Expand All @@ -23,9 +23,9 @@ public class CqReader implements Reader<byte[]> {
final private static int MICROS_PER_MS = 1000;
final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS;
final private static int PARK_NS = NS_PER_MS;
private final ConcurrentLinkedQueue<byte[]> queue;
private final Queue<byte[]> queue;

public CqReader(ConcurrentLinkedQueue queue) throws IOException {
public CqReader(Queue queue) throws IOException {
this.queue = queue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
*/
package io.sbk.driver.ConcurrentQ;

import io.perl.api.Queue;
import io.sbk.api.Writer;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;


/**
* Class for Concurrent Queue Writer.
*/
public class CqWriter implements Writer<byte[]> {
private ConcurrentLinkedQueue<byte[]> queue;
private Queue<byte[]> queue;

public CqWriter(ConcurrentLinkedQueue queue) throws IOException {
public CqWriter(Queue queue) throws IOException {
this.queue = queue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.sbk.driver.ConcurrentQ;

import io.perl.api.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

public class LinkedCQueue<T> extends ConcurrentLinkedQueue<T> implements Queue<T> {
}
12 changes: 12 additions & 0 deletions drivers/conqueue/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
api project(":drivers:concurrentq")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Conqueue;

import io.perl.api.impl.CQueue;

import io.sbk.driver.ConcurrentQ.ConcurrentQ;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Storage;
import java.io.IOException;


/**
* Class for Conqueue storage driver.
*
* Incase if your data type in other than byte[] (Byte Array)
* then change the datatype and getDataType.
*/
public class Conqueue extends ConcurrentQ implements Storage<byte[]> {

@Override
public void openStorage(final ParameterOptions params) throws IOException {
this.queue = new CQueue<>();
}

}
9 changes: 9 additions & 0 deletions drivers/conqueue/src/main/resources/Conqueue.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#Copyright (c) KMG. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Conqueue storage driver default Properties/parameters
11 changes: 11 additions & 0 deletions drivers/linkedbq/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
api project(":drivers:concurrentq")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.sbk.driver.Linkedbq;

import io.perl.api.Queue;

import java.util.concurrent.LinkedBlockingDeque;

public class LinkedBQueue<T> extends LinkedBlockingDeque<T> implements Queue<T> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Linkedbq;


import io.sbk.driver.ConcurrentQ.ConcurrentQ;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Storage;

import java.io.IOException;

/**
* Class for Linkedbq storage driver.
*
* Incase if your data type in other than byte[] (Byte Array)
* then change the datatype and getDataType.
*/
public class Linkedbq extends ConcurrentQ implements Storage<byte[]> {

@Override
public void openStorage(final ParameterOptions params) throws IOException {
this.queue = new LinkedBQueue<>();
}

}
9 changes: 9 additions & 0 deletions drivers/linkedbq/src/main/resources/Linkedbq.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#Copyright (c) KMG. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Linkedbq storage driver default Properties/parameters
7 changes: 4 additions & 3 deletions perl/src/main/java/io/perl/api/impl/CQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ public CQueue() {

@Override
public T poll() {
final Object cur = NEXT.getAndSetRelease(head, null);
final Object cur = NEXT.getAndSet(head, null);
if (cur == null) {
return null;
}
Object oldHead = HEAD.getAndSet(this, cur);
oldHead = null; //might help the Java garbage collector
Object prevHead = HEAD.getAndSet(this, cur);
NEXT.getAndSet(prevHead, null); //might help the Java garbage collector
prevHead = null;
return (T) ITEM.getAndSetRelease(cur, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public void checkWindowFullAndReset(long currTime) {
*/
public void checkTotalWindowFullAndReset(long currTime) {
if (totalWindow.isFull()) {
stop(currTime);
start(currTime);
/* don't call stop here , it may cause recursion */
totalWindow.print(currTime, totalLogger, null);
totalWindow.reset(currTime);
}
}

Expand Down Expand Up @@ -103,7 +104,9 @@ public void start(long startTime) {
@Override
public void stop(long endTime) {
if (window.getTotalRecords() > 0) {
stopWindow(endTime);
/* don't call stopWindow , it leads to recursion */
window.print(endTime, windowLogger, totalWindow);
window.reset(endTime);
}
totalWindow.print(endTime, totalLogger, null);
}
Expand Down
2 changes: 2 additions & 0 deletions settings-drivers.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,7 @@ include 'drivers:couchbase'
include 'drivers:memcached'
include 'drivers:dynamodb'
include 'drivers:exasol'
include 'drivers:conqueue'
include 'drivers:linkedbq'
/* include 'drivers:sbktemplate' */
/* above line is a signature */

0 comments on commit 3af9d4f

Please sign in to comment.