Skip to content

Commit

Permalink
[hbase10] Fix #701 by mimicking the same locks from the HBase 0.98 cl…
Browse files Browse the repository at this point in the history
…ient in the HBase 10 client.
  • Loading branch information
manolama committed Sep 19, 2017
1 parent d6e57c3 commit 4a4ab6e
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@
* durability.
*/
public class HBaseClient10 extends com.yahoo.ycsb.DB {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);

private static final Object TABLE_LOCK = new Object();

private Configuration config = HBaseConfiguration.create();

private static AtomicInteger threadCount = new AtomicInteger(0);


private boolean debug = false;

private String tableName = "";
Expand All @@ -82,7 +84,7 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
* @See #CONNECTION_LOCK.
*/
private static Connection connection = null;
private static final Object CONNECTION_LOCK = new Object();
//private static final Object CONNECTION_LOCK = new Object();

// Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
Expand Down Expand Up @@ -145,8 +147,8 @@ public void init() throws DBException {
}

try {
threadCount.getAndIncrement();
synchronized (CONNECTION_LOCK) {
THREAD_COUNT.getAndIncrement();
synchronized (THREAD_COUNT) {
if (connection == null) {
// Initialize if not set up already.
connection = ConnectionFactory.createConnection(config);
Expand Down Expand Up @@ -179,7 +181,7 @@ public void init() throws DBException {
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
try {
final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) {
synchronized (THREAD_COUNT) {
connection.getTable(tName).getTableDescriptor();
}
} catch (IOException e) {
Expand Down Expand Up @@ -208,14 +210,11 @@ public void cleanup() throws DBException {
long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000));
threadCount.decrementAndGet();
if (threadCount.get() <= 0) {
// Means we are done so ok to shut down the Connection.
synchronized (CONNECTION_LOCK) {
if (connection != null) {
connection.close();
connection = null;
}
synchronized (THREAD_COUNT) {
int threadCount = THREAD_COUNT.decrementAndGet();
if (threadCount <= 0 && connection != null) {
connection.close();
connection = null;
}
}
} catch (IOException e) {
Expand All @@ -225,7 +224,7 @@ public void cleanup() throws DBException {

public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) {
synchronized (TABLE_LOCK) {
this.currentTable = connection.getTable(tName);
if (clientSideBuffering) {
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
Expand Down

0 comments on commit 4a4ab6e

Please sign in to comment.