Skip to content

Commit

Permalink
HIVE-20701: Allow HiveStreaming to receive a key value to commit atom…
Browse files Browse the repository at this point in the history
…ically together with the transaction (Jaume M reviewed by Prasanth Jayachandran)
  • Loading branch information
beltran authored and prasanthj committed Oct 22, 2018
1 parent cbe3228 commit 7765e90
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -151,6 +152,9 @@ public List<TxnToWriteId> getTxnToWriteIds() {
}

public void commit() throws StreamingException {
commitWithPartitions(null);
commit(null);
}
public void commit(Set<String> partitions) throws StreamingException {
commit(partitions, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public String toString() {
private boolean manageTransactions;
private int countTransactions = 0;
private Set<String> partitions;
private Long tableId;

private HiveStreamingConnection(Builder builder) throws StreamingException {
this.database = builder.database.toLowerCase();
Expand Down Expand Up @@ -574,12 +575,18 @@ public void beginTransaction() throws StreamingException {

@Override
public void commitTransaction() throws StreamingException {
commitTransactionWithPartition(null);
commitTransaction(null);
}

@Override
public void commitTransactionWithPartition(Set<String> partitions)
public void commitTransaction(Set<String> partitions)
throws StreamingException {
commitTransaction(partitions, null, null);
}

@Override
public void commitTransaction(Set<String> partitions, String key,
String value) throws StreamingException {
checkState();

Set<String> createdPartitions = new HashSet<>();
Expand All @@ -598,7 +605,7 @@ public void commitTransactionWithPartition(Set<String> partitions)
connectionStats.incrementTotalPartitions(partitions.size());
}

currentTransactionBatch.commitWithPartitions(createdPartitions);
currentTransactionBatch.commit(createdPartitions, key, value);
this.partitions.addAll(
currentTransactionBatch.getPartitions());
connectionStats.incrementCreatedPartitions(createdPartitions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,26 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
void commitTransaction() throws StreamingException;

/**
* Commit a transaction to make the writes visible for readers. Include
* other partitions that may have been added independently.
*
* Commits the transaction together with a key value atomically.
* @param partitions - extra partitions to commit.
* @throws StreamingException - if there are errors when committing the open transaction.
* @param key - key to commit.
* @param value - value to commit.
* @throws StreamingException - if there are errors when committing
* the open transaction.
*/
default void commitTransactionWithPartition(@Nullable Set<String> partitions)
default void commitTransaction(@Nullable Set<String> partitions,
@Nullable String key, @Nullable String value) throws StreamingException {
throw new UnsupportedOperationException();
}

/**
* Commit a transaction to make the writes visible for readers. Include
* other partitions that may have been added independently.
*
* @param partitions - extra partitions to commit.
* @throws StreamingException - if there are errors when committing the open transaction.
*/
default void commitTransaction(@Nullable Set<String> partitions)
throws StreamingException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hive.streaming;

import org.apache.hadoop.hive.metastore.api.TxnToWriteId;

import javax.annotation.Nullable;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
Expand All @@ -45,7 +47,17 @@ public interface StreamingTransaction {
* @param partitions to commit.
* @throws StreamingException
*/
void commitWithPartitions(Set<String> partitions) throws StreamingException;
void commit(@Nullable Set<String> partitions) throws StreamingException;

/**
* Commits atomically together with a key and a value.
* @param partitions to commit.
* @param key to commit.
* @param value to commit.
* @throws StreamingException
*/
void commit(@Nullable Set<String> partitions, @Nullable String key,
@Nullable String value) throws StreamingException;

/**
* Abort a transaction.
Expand Down
26 changes: 22 additions & 4 deletions streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public class TransactionBatch extends AbstractStreamingTransaction {

private String agentInfo;
private int numTxns;

/**
* Id of the table from the streaming connection.
*/
private final long tableId;
/**
* Tracks the state of each transaction.
*/
Expand Down Expand Up @@ -107,6 +112,7 @@ public TransactionBatch(HiveStreamingConnection conn) throws StreamingException
this.recordWriter = conn.getRecordWriter();
this.agentInfo = conn.getAgentInfo();
this.numTxns = conn.getTransactionBatchSize();
this.tableId = conn.getTable().getTTable().getId();

setupHeartBeatThread();

Expand Down Expand Up @@ -244,19 +250,26 @@ private void beginNextTransactionImpl() throws StreamingException {
}
}

public void commitWithPartitions(Set<String> partitions) throws StreamingException {
public void commit(Set<String> partitions, String key, String value)
throws StreamingException {
checkIsClosed();
boolean success = false;
try {
commitImpl(partitions);
commitImpl(partitions, key, value);
success = true;
} finally {
markDead(success);
}
}

private void commitImpl(Set<String> partitions) throws StreamingException {
private void commitImpl(Set<String> partitions, String key, String value)
throws StreamingException {
try {
if ((key == null && value != null) || (key != null && value == null)) {
throw new StreamingException(String.format(
"If key is set, the value should be as well and vice versa,"
+ " key, value = %s, %s", key, value));
}
recordWriter.flush();
TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
if (conn.isDynamicPartitioning()) {
Expand All @@ -274,7 +287,12 @@ private void commitImpl(Set<String> partitions) throws StreamingException {
}
transactionLock.lock();
try {
conn.getMSC().commitTxn(txnToWriteId.getTxnId());
if (key != null) {
conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(),
tableId, key, value);
} else {
conn.getMSC().commitTxn(txnToWriteId.getTxnId());
}
// increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
// the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
if (currentTxnIndex + 1 < txnToWriteIds.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void beginNextTransaction() throws StreamingException {
}

@Override
public void commitWithPartitions(Set<String> partitions) throws StreamingException {
public void commit(Set<String> partitions, String key, String value)
throws StreamingException {
checkIsClosed();
boolean success = false;
try {
Expand Down
41 changes: 39 additions & 2 deletions streaming/src/test/org/apache/hive/streaming/TestStreaming.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,43 @@ public void testGetDeltaPath() throws Exception {
+ "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
}

@Test
public void testCommitWithKeyValue() throws Exception {
queryTable(driver, "drop table if exists default.keyvalue");
queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
queryTable(driver, "insert into default.keyvalue values('foo','bar')");
queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')");
List<String> rs = queryTable(driver, "select * from default.keyvalue");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withTable("keyvalue")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(2)
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("a1,b2".getBytes());
connection.write("a3,b4".getBytes());
connection.commitTransaction(null, "_metamykey", "myvalue");
connection.close();

rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID");
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));

rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')");
Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0));
}

@Test
public void testConnectionWithWriteId() throws Exception {
queryTable(driver, "drop table if exists default.writeidconnection");
Expand Down Expand Up @@ -1139,7 +1176,7 @@ public void testAddPartitionWithWriteId() throws Exception {
Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
} catch (NoSuchObjectException e) {}

transactionConnection.commitTransactionWithPartition(partitions);
transactionConnection.commitTransaction(partitions);

// Ensure partition is present
Partition p = msClient.getPartition(dbName, tblName, newPartVals);
Expand Down Expand Up @@ -1217,7 +1254,7 @@ public void testAddDynamicPartitionWithWriteId() throws Exception {

partitionsOne.addAll(partitionsTwo);
Set<String> allPartitions = partitionsOne;
transactionConnection.commitTransactionWithPartition(allPartitions);
transactionConnection.commitTransaction(allPartitions);

// Ensure partition is present
for (String partition : allPartitions) {
Expand Down

0 comments on commit 7765e90

Please sign in to comment.