Skip to content

Commit

Permalink
HIVE-20542: Incremental REPL DUMP progress information log message is…
Browse files Browse the repository at this point in the history
… incorrect (Ashutosh Bapat, reviewed by Sankar Hariappan)

Signed-off-by: Sankar Hariappan <[email protected]>
  • Loading branch information
Ashutosh Bapat authored and sankarh committed Oct 23, 2018
1 parent 7765e90 commit 0d4d03f
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -939,19 +940,71 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE
long nextNLId = getNextNLId(stmt, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MNotificationLog");

String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," +
quoteString(" ") + ",?, ?)";
String insertVal;
String columns;
List<String> params = new ArrayList<String>();

// Construct the values string, parameters and column string step by step simultaneously so
// that the positions of columns and of their corresponding values do not go out of sync.

// Notification log id
columns = "\"NL_ID\"";
insertVal = "" + nextNLId;

// Event id
columns = columns + ", \"EVENT_ID\"";
insertVal = insertVal + "," + nextEventId;

// Event time
columns = columns + ", \"EVENT_TIME\"";
insertVal = insertVal + "," + now();

// Event type
columns = columns + ", \"EVENT_TYPE\"";
insertVal = insertVal + ", ?";
params.add(event.getEventType());

// Message
columns = columns + ", \"MESSAGE\"";
insertVal = insertVal + ", ?";
params.add(event.getMessage());

// Message format
columns = columns + ", \"MESSAGE_FORMAT\"";
insertVal = insertVal + ", ?";
params.add(event.getMessageFormat());

// Database name, optional
String dbName = event.getDbName();
if (dbName != null) {
assert dbName.equals(dbName.toLowerCase());
columns = columns + ", \"DB_NAME\"";
insertVal = insertVal + ", ?";
params.add(dbName);
}

s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
" \"EVENT_TYPE\", \"DB_NAME\", " +
" \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal;
List<String> params = Arrays.asList(
event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat());
pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
// Table name, optional
String tableName = event.getTableName();
if (tableName != null) {
assert tableName.equals(tableName.toLowerCase());
columns = columns + ", \"TBL_NAME\"";
insertVal = insertVal + ", ?";
params.add(tableName);
}

LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
quoteString(event.getEventType()), quoteString(event.getDbName()),
quoteString(event.getMessage()), quoteString(event.getMessageFormat()));
// Catalog name, optional
String catName = event.getCatName();
if (catName != null) {
assert catName.equals(catName.toLowerCase());
columns = columns + ", \"CAT_NAME\"";
insertVal = insertVal + ", ?";
params.add(catName);
}

s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")";
pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
LOG.debug("Going to execute insert <" + s + "> with parameters (" +
String.join(", ", params) + ")");
pst.execute();

// Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
Expand Down Expand Up @@ -300,6 +301,21 @@ public void tearDown() {
MockMetaStoreEventListener.clearEvents();
}

// Test if the number of events between the given event ids and with the given database name are
// same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
private void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
long expectedCount) throws Exception {
NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);

if (toEventId != null) {
rqst.setToEventId(toEventId);
}
if (limit != null) {
rqst.setLimit(limit);
}

assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount());
}

@Test
public void createDatabase() throws Exception {
Expand Down Expand Up @@ -341,6 +357,10 @@ public void createDatabase() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());

// There's only one event corresponding to CREATE DATABASE
testEventCounts(dbName, firstEventId, null, null, 1);
testEventCounts(dbName2, firstEventId, null, null, 0);
}

@Test
Expand All @@ -358,6 +378,7 @@ public void dropDatabase() throws Exception {

// Two events: one for create db and other for drop db
assertEquals(2, rsp.getEventsSize());
testEventCounts(dbName, firstEventId, null, null, 2);

// Read event from notification
NotificationEvent event = rsp.getEvents().get(1);
Expand Down Expand Up @@ -388,6 +409,7 @@ public void dropDatabase() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
testEventCounts(dbName2, firstEventId, null, null, 1);
}

@Test
Expand Down Expand Up @@ -443,6 +465,7 @@ public void createTable() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 1);
}

@Test
Expand Down Expand Up @@ -501,6 +524,7 @@ public void alterTable() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 2);
}

@Test
Expand Down Expand Up @@ -567,6 +591,7 @@ public void dropTable() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
Expand Down Expand Up @@ -636,6 +661,7 @@ public void addPartition() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 2);
}

@Test
Expand Down Expand Up @@ -704,6 +730,7 @@ public void alterPartition() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
Expand Down Expand Up @@ -778,6 +805,7 @@ public void dropPartition() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 4);
}

@Test
Expand Down Expand Up @@ -873,6 +901,7 @@ public void exchangePartition() throws Exception {
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
testEventCounts(dbName, firstEventId, null, null, 5);
}

@Test
Expand Down Expand Up @@ -931,6 +960,7 @@ public void createFunction() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 1);
}

@Test
Expand Down Expand Up @@ -985,6 +1015,7 @@ public void dropFunction() throws Exception {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
Expand Down Expand Up @@ -1040,6 +1071,7 @@ public void insertTable() throws Exception {
// Verify the eventID was passed to the non-transactional listener
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
testEventCounts(defaultDbName, firstEventId, null, null, 2);
}

@Test
Expand Down Expand Up @@ -1106,6 +1138,7 @@ public void insertPartition() throws Exception {
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}


Expand Down Expand Up @@ -1200,10 +1233,12 @@ public void sqlInsertTable() throws Exception {
event = rsp.getEvents().get(5);
assertEquals(firstEventId + 6, event.getEventId());
assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
testEventCounts(defaultDbName, firstEventId, null, null, 6);
}

@Test
public void sqlCTAS() throws Exception {
String defaultDbName = "default";
String sourceTblName = "sqlctasins1";
String targetTblName = "sqlctasins2";
// Event 1
Expand All @@ -1229,17 +1264,20 @@ public void sqlCTAS() throws Exception {
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
testEventCounts(defaultDbName, firstEventId, null, null, 6);
}

@Test
public void sqlTempTable() throws Exception {
String defaultDbName = "default";
String tempTblName = "sqltemptbl";
driver.run("create temporary table " + tempTblName + " (c int)");
driver.run("insert into table " + tempTblName + " values (1)");

// Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(0, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 0);
}

@Test
Expand All @@ -1263,6 +1301,7 @@ public void sqlDb() throws Exception {

@Test
public void sqlInsertPartition() throws Exception {
String defaultDbName = "default";
String tblName = "sqlinsptn";
// Event 1
driver.run("create table " + tblName + " (c int) partitioned by (ds string)");
Expand All @@ -1274,6 +1313,13 @@ public void sqlInsertPartition() throws Exception {
driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
// Event 9, 10
driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");

testEventCounts(defaultDbName, firstEventId, null, null, 10);
// Test a limit higher than available events
testEventCounts(defaultDbName, firstEventId, null, 100, 10);
// Test toEventId lower than current eventId
testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5);

// Event 10, 11, 12
driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
// Event 12, 13, 14
Expand Down Expand Up @@ -1340,6 +1386,9 @@ public void sqlInsertPartition() throws Exception {
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));

// Test fromEventId different from the very first
testEventCounts(defaultDbName, event.getEventId(), null, null, 3);

event = rsp.getEvents().get(21);
assertEquals(firstEventId + 22, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
Expand All @@ -1355,6 +1404,16 @@ public void sqlInsertPartition() throws Exception {
assertEquals(firstEventId + 24, event.getEventId());
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
testEventCounts(defaultDbName, firstEventId, null, null, 24);

// Test a limit within the available events
testEventCounts(defaultDbName, firstEventId, null, 10, 10);
// Test toEventId greater than current eventId
testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24);
// Test toEventId greater than current eventId with some limit within available events
testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10);
// Test toEventId greater than current eventId with some limit beyond available events
testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24);
}

private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ public void testOpenTxnEvent() throws Throwable {
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);

// create table will start and coomit the transaction
primary.run("use " + primaryDbName)
.run("CREATE TABLE " + tableName +
" (key int, value int) PARTITIONED BY (load_date date) " +
Expand All @@ -495,6 +494,11 @@ public void testOpenTxnEvent() throws Throwable {

WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);

long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId);
primary.testEventCounts(primaryDbName, lastReplId, null, null, 20);

// Test load
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
Expand Down Expand Up @@ -416,6 +417,22 @@ ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) {
return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
}

// Test if the number of events between the given event ids and with the given database name are
// same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
public void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
long expectedCount) throws Exception {
NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);

if (toEventId != null) {
rqst.setToEventId(toEventId);
}
if (limit != null) {
rqst.setLimit(limit);
}

assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount());
}

@Override
public void close() throws IOException {
if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);

lastReplId = work.eventTo;

// Right now the only pattern allowed to be specified is *, which matches all the database
// names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter
// on database name when it's *. In future, if we support more elaborate patterns, we will
// have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get
// correct event count.
String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
? work.dbNameOrPattern
: "?";
replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName));
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
work.maxEventLimit()));
replLogger.startLog();
while (evIter.hasNext()) {
NotificationEvent ev = evIter.next();
Expand Down
Loading

0 comments on commit 0d4d03f

Please sign in to comment.