From fb0f0c2f261d0427162e7941fc35b8e26536d1b7 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 24 May 2024 20:32:38 +0530 Subject: [PATCH 1/6] Add the concurrent queue performance benchmarking. Signed-off-by: Keshava Munegowda --- build-drivers.gradle | 1 + .../io/sbk/driver/ConcurrentQ/ConcurrentQ.java | 6 +++--- .../io/sbk/driver/ConcurrentQ/CqReader.java | 6 +++--- .../io/sbk/driver/ConcurrentQ/CqWriter.java | 7 ++++--- .../sbk/driver/ConcurrentQ/LinkedCQueue.java | 18 ++++++++++++++++++ 5 files changed, 29 insertions(+), 9 deletions(-) create mode 100644 drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/LinkedCQueue.java diff --git a/build-drivers.gradle b/build-drivers.gradle index 181a2b4d..e047cb1f 100644 --- a/build-drivers.gradle +++ b/build-drivers.gradle @@ -1,6 +1,7 @@ dependencies { api project(":drivers:file") + api project(":drivers:concurrentq") api project(":drivers:filestream") api project(":drivers:asyncfile") api project(":drivers:hdfs") diff --git a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java index a5c97e45..dfc6f1ea 100644 --- a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java +++ b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java @@ -9,6 +9,7 @@ */ package io.sbk.driver.ConcurrentQ; +import io.perl.api.Queue; import io.sbk.api.DataReader; import io.sbk.api.DataWriter; import io.sbk.params.ParameterOptions; @@ -16,13 +17,12 @@ import io.sbk.params.InputOptions; import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; /** * Class for Concurrent Queue Benchmarking. */ public class ConcurrentQ implements Storage { - private ConcurrentLinkedQueue queue; + private Queue queue; @Override public void addArgs(final InputOptions params) throws IllegalArgumentException { @@ -37,7 +37,7 @@ public void parseArgs(final ParameterOptions params) throws IllegalArgumentExcep @Override public void openStorage(final ParameterOptions params) throws IOException { - this.queue = new ConcurrentLinkedQueue<>(); + this.queue = new LinkedCQueue<>(); } @Override diff --git a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqReader.java b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqReader.java index e9a87d82..96d2b84a 100644 --- a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqReader.java +++ b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqReader.java @@ -9,10 +9,10 @@ */ package io.sbk.driver.ConcurrentQ; +import io.perl.api.Queue; import io.sbk.api.Reader; import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.LockSupport; /** @@ -23,9 +23,9 @@ public class CqReader implements Reader { final private static int MICROS_PER_MS = 1000; final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS; final private static int PARK_NS = NS_PER_MS; - private final ConcurrentLinkedQueue queue; + private final Queue queue; - public CqReader(ConcurrentLinkedQueue queue) throws IOException { + public CqReader(Queue queue) throws IOException { this.queue = queue; } diff --git a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqWriter.java b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqWriter.java index dfd06f48..ae245ff1 100644 --- a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqWriter.java +++ b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/CqWriter.java @@ -9,19 +9,20 @@ */ package io.sbk.driver.ConcurrentQ; +import io.perl.api.Queue; import io.sbk.api.Writer; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; + /** * Class for Concurrent Queue Writer. */ public class CqWriter implements Writer { - private ConcurrentLinkedQueue queue; + private Queue queue; - public CqWriter(ConcurrentLinkedQueue queue) throws IOException { + public CqWriter(Queue queue) throws IOException { this.queue = queue; } diff --git a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/LinkedCQueue.java b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/LinkedCQueue.java new file mode 100644 index 00000000..ee06ee5a --- /dev/null +++ b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/LinkedCQueue.java @@ -0,0 +1,18 @@ +/** + * Copyright (c) KMG. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.sbk.driver.ConcurrentQ; + +import io.perl.api.Queue; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class LinkedCQueue extends ConcurrentLinkedQueue implements Queue { +} From 17f986cb13b2855a9e1581504656d67406fd2f1f Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 24 May 2024 20:38:36 +0530 Subject: [PATCH 2/6] Update poll method of CQueue. Signed-off-by: Keshava Munegowda --- perl/src/main/java/io/perl/api/impl/CQueue.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/perl/src/main/java/io/perl/api/impl/CQueue.java b/perl/src/main/java/io/perl/api/impl/CQueue.java index 8a4a84d7..a1ab8c01 100644 --- a/perl/src/main/java/io/perl/api/impl/CQueue.java +++ b/perl/src/main/java/io/perl/api/impl/CQueue.java @@ -60,12 +60,13 @@ public CQueue() { @Override public T poll() { - final Object cur = NEXT.getAndSetRelease(head, null); + final Object cur = NEXT.getAndSet(head, null); if (cur == null) { return null; } - Object oldHead = HEAD.getAndSet(this, cur); - oldHead = null; //might help the Java garbage collector + Object prevHead = HEAD.getAndSet(this, cur); + NEXT.getAndSet(prevHead, null); //might help the Java garbage collector + prevHead = null; return (T) ITEM.getAndSetRelease(cur, null); } From e3bd2ccd0e2b868fa70485674b532f5eda29d569 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Fri, 24 May 2024 21:34:37 +0530 Subject: [PATCH 3/6] add conqueue driver for benchmarking Signed-off-by: Keshava Munegowda --- build-drivers.gradle | 1 + .../src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java | 2 +- settings-drivers.gradle | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/build-drivers.gradle b/build-drivers.gradle index e047cb1f..9d497738 100644 --- a/build-drivers.gradle +++ b/build-drivers.gradle @@ -46,6 +46,7 @@ dependencies { api project(':drivers:memcached') api project(':drivers:dynamodb') api project(':drivers:exasol') + api project(':drivers:conqueue') /* api project(':drivers:sbktemplate') */ /* above line is a signature */ } diff --git a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java index dfc6f1ea..237765e5 100644 --- a/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java +++ b/drivers/concurrentq/src/main/java/io/sbk/driver/ConcurrentQ/ConcurrentQ.java @@ -22,7 +22,7 @@ * Class for Concurrent Queue Benchmarking. */ public class ConcurrentQ implements Storage { - private Queue queue; + protected Queue queue; @Override public void addArgs(final InputOptions params) throws IllegalArgumentException { diff --git a/settings-drivers.gradle b/settings-drivers.gradle index 3dc985bc..ff18e54f 100644 --- a/settings-drivers.gradle +++ b/settings-drivers.gradle @@ -56,5 +56,6 @@ include 'drivers:couchbase' include 'drivers:memcached' include 'drivers:dynamodb' include 'drivers:exasol' +include 'drivers:conqueue' /* include 'drivers:sbktemplate' */ /* above line is a signature */ From 4c20446d6f4ff1c4ef8a2ac7f6ec6e7096d43983 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 25 May 2024 13:18:10 +0530 Subject: [PATCH 4/6] Avoid recursion in recording latency when the overflow condition occurs Signed-off-by: Keshava Munegowda --- .../java/io/perl/api/impl/TotalLatencyRecordWindow.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/perl/src/main/java/io/perl/api/impl/TotalLatencyRecordWindow.java b/perl/src/main/java/io/perl/api/impl/TotalLatencyRecordWindow.java index e21e761b..ac5ac5ec 100644 --- a/perl/src/main/java/io/perl/api/impl/TotalLatencyRecordWindow.java +++ b/perl/src/main/java/io/perl/api/impl/TotalLatencyRecordWindow.java @@ -74,8 +74,9 @@ public void checkWindowFullAndReset(long currTime) { */ public void checkTotalWindowFullAndReset(long currTime) { if (totalWindow.isFull()) { - stop(currTime); - start(currTime); + /* don't call stop here , it may cause recursion */ + totalWindow.print(currTime, totalLogger, null); + totalWindow.reset(currTime); } } @@ -103,7 +104,9 @@ public void start(long startTime) { @Override public void stop(long endTime) { if (window.getTotalRecords() > 0) { - stopWindow(endTime); + /* don't call stopWindow , it leads to recursion */ + window.print(endTime, windowLogger, totalWindow); + window.reset(endTime); } totalWindow.print(endTime, totalLogger, null); } From 924915f742f10d148511f513dd1b42c80f46be40 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 25 May 2024 15:17:24 +0530 Subject: [PATCH 5/6] Add the performance benchmarking of linked blocking queue. Signed-off-by: Keshava Munegowda --- build-drivers.gradle | 1 + .../io/sbk/driver/Linkedbq/LinkedBQueue.java | 18 ++++++++++++++++++ settings-drivers.gradle | 1 + 3 files changed, 20 insertions(+) create mode 100644 drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/LinkedBQueue.java diff --git a/build-drivers.gradle b/build-drivers.gradle index 9d497738..85b3d818 100644 --- a/build-drivers.gradle +++ b/build-drivers.gradle @@ -47,6 +47,7 @@ dependencies { api project(':drivers:dynamodb') api project(':drivers:exasol') api project(':drivers:conqueue') + api project(':drivers:linkedbq') /* api project(':drivers:sbktemplate') */ /* above line is a signature */ } diff --git a/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/LinkedBQueue.java b/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/LinkedBQueue.java new file mode 100644 index 00000000..ca4d5905 --- /dev/null +++ b/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/LinkedBQueue.java @@ -0,0 +1,18 @@ +/** + * Copyright (c) KMG. All Rights Reserved.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.sbk.driver.Linkedbq; + +import io.perl.api.Queue; + +import java.util.concurrent.LinkedBlockingDeque; + +public class LinkedBQueue extends LinkedBlockingDeque implements Queue { +} \ No newline at end of file diff --git a/settings-drivers.gradle b/settings-drivers.gradle index ff18e54f..c4274b25 100644 --- a/settings-drivers.gradle +++ b/settings-drivers.gradle @@ -57,5 +57,6 @@ include 'drivers:memcached' include 'drivers:dynamodb' include 'drivers:exasol' include 'drivers:conqueue' +include 'drivers:linkedbq' /* include 'drivers:sbktemplate' */ /* above line is a signature */ From fcb355d5d2007016411d6cf9f537f1bc2fc40e00 Mon Sep 17 00:00:00 2001 From: Keshava Munegowda Date: Sat, 25 May 2024 15:28:42 +0530 Subject: [PATCH 6/6] Fix the remote build issues Signed-off-by: Keshava Munegowda --- drivers/conqueue/build.gradle | 12 +++++++ .../java/io/sbk/driver/Conqueue/Conqueue.java | 33 +++++++++++++++++++ .../src/main/resources/Conqueue.properties | 9 +++++ drivers/linkedbq/build.gradle | 11 +++++++ .../java/io/sbk/driver/Linkedbq/Linkedbq.java | 32 ++++++++++++++++++ .../src/main/resources/Linkedbq.properties | 9 +++++ 6 files changed, 106 insertions(+) create mode 100644 drivers/conqueue/build.gradle create mode 100644 drivers/conqueue/src/main/java/io/sbk/driver/Conqueue/Conqueue.java create mode 100644 drivers/conqueue/src/main/resources/Conqueue.properties create mode 100644 drivers/linkedbq/build.gradle create mode 100644 drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/Linkedbq.java create mode 100644 drivers/linkedbq/src/main/resources/Linkedbq.properties diff --git a/drivers/conqueue/build.gradle b/drivers/conqueue/build.gradle new file mode 100644 index 00000000..02037598 --- /dev/null +++ b/drivers/conqueue/build.gradle @@ -0,0 +1,12 @@ +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + api project(":drivers:concurrentq") + +} diff --git a/drivers/conqueue/src/main/java/io/sbk/driver/Conqueue/Conqueue.java b/drivers/conqueue/src/main/java/io/sbk/driver/Conqueue/Conqueue.java new file mode 100644 index 00000000..9848ff77 --- /dev/null +++ b/drivers/conqueue/src/main/java/io/sbk/driver/Conqueue/Conqueue.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) KMG. All Rights Reserved.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.sbk.driver.Conqueue; + +import io.perl.api.impl.CQueue; + +import io.sbk.driver.ConcurrentQ.ConcurrentQ; +import io.sbk.params.ParameterOptions; +import io.sbk.api.Storage; +import java.io.IOException; + + +/** + * Class for Conqueue storage driver. + * + * Incase if your data type in other than byte[] (Byte Array) + * then change the datatype and getDataType. + */ +public class Conqueue extends ConcurrentQ implements Storage { + + @Override + public void openStorage(final ParameterOptions params) throws IOException { + this.queue = new CQueue<>(); + } + +} diff --git a/drivers/conqueue/src/main/resources/Conqueue.properties b/drivers/conqueue/src/main/resources/Conqueue.properties new file mode 100644 index 00000000..fade889d --- /dev/null +++ b/drivers/conqueue/src/main/resources/Conqueue.properties @@ -0,0 +1,9 @@ +#Copyright (c) KMG. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Conqueue storage driver default Properties/parameters diff --git a/drivers/linkedbq/build.gradle b/drivers/linkedbq/build.gradle new file mode 100644 index 00000000..bddde7a2 --- /dev/null +++ b/drivers/linkedbq/build.gradle @@ -0,0 +1,11 @@ +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + api project(":drivers:concurrentq") +} diff --git a/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/Linkedbq.java b/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/Linkedbq.java new file mode 100644 index 00000000..dc85775a --- /dev/null +++ b/drivers/linkedbq/src/main/java/io/sbk/driver/Linkedbq/Linkedbq.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) KMG. All Rights Reserved.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.sbk.driver.Linkedbq; + + +import io.sbk.driver.ConcurrentQ.ConcurrentQ; +import io.sbk.params.ParameterOptions; +import io.sbk.api.Storage; + +import java.io.IOException; + +/** + * Class for Linkedbq storage driver. + * + * Incase if your data type in other than byte[] (Byte Array) + * then change the datatype and getDataType. + */ +public class Linkedbq extends ConcurrentQ implements Storage { + + @Override + public void openStorage(final ParameterOptions params) throws IOException { + this.queue = new LinkedBQueue<>(); + } + +} diff --git a/drivers/linkedbq/src/main/resources/Linkedbq.properties b/drivers/linkedbq/src/main/resources/Linkedbq.properties new file mode 100644 index 00000000..64b15b3b --- /dev/null +++ b/drivers/linkedbq/src/main/resources/Linkedbq.properties @@ -0,0 +1,9 @@ +#Copyright (c) KMG. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Linkedbq storage driver default Properties/parameters