Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hbase10] Address #701 by mimicking the same locks from the HBase 0.9… #1028

Merged
merged 4 commits into from
Sep 21, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 24 additions & 33 deletions hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@
* durability.
*/
public class HBaseClient10 extends com.yahoo.ycsb.DB {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);

private Configuration config = HBaseConfiguration.create();

private static AtomicInteger threadCount = new AtomicInteger(0);


private boolean debug = false;

private String tableName = "";
Expand All @@ -82,7 +82,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
* @See #CONNECTION_LOCK.
*/
private static Connection connection = null;
private static final Object CONNECTION_LOCK = new Object();

// Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
Expand Down Expand Up @@ -144,12 +143,19 @@ public void init() throws DBException {
}
}

String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
try {
threadCount.getAndIncrement();
synchronized (CONNECTION_LOCK) {
THREAD_COUNT.getAndIncrement();
synchronized (THREAD_COUNT) {
if (connection == null) {
// Initialize if not set up already.
connection = ConnectionFactory.createConnection(config);

// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
final TableName tName = TableName.valueOf(table);
connection.getTable(tName).getTableDescriptor();
}
}
} catch (java.io.IOException e) {
Expand All @@ -172,19 +178,6 @@ public void init() throws DBException {
throw new DBException("No columnfamily specified");
}
columnFamilyBytes = Bytes.toBytes(columnFamily);

// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
try {
final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) {
connection.getTable(tName).getTableDescriptor();
}
} catch (IOException e) {
throw new DBException(e);
}
}

/**
Expand All @@ -208,14 +201,14 @@ public void cleanup() throws DBException {
long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000));
threadCount.decrementAndGet();
if (threadCount.get() <= 0) {
int threadCount = THREAD_COUNT.decrementAndGet();
if (threadCount <= 0) {
// Means we are done so ok to shut down the Connection.
synchronized (CONNECTION_LOCK) {
if (connection != null) {
connection.close();
connection = null;
}
synchronized (THREAD_COUNT) {
if (connection != null) {
connection.close();
connection = null;
}
}
}
} catch (IOException e) {
Expand All @@ -225,13 +218,11 @@ public void cleanup() throws DBException {

public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) {
this.currentTable = connection.getTable(tName);
if (clientSideBuffering) {
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
p.writeBufferSize(writeBufferSize);
this.bufferedMutator = connection.getBufferedMutator(p);
}
this.currentTable = connection.getTable(tName);
if (clientSideBuffering) {
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
p.writeBufferSize(writeBufferSize);
this.bufferedMutator = connection.getBufferedMutator(p);
}
}

Expand Down