From 0d4d03fd1daeb3b75182b73f7b40de7a3b7d48ea Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Tue, 23 Oct 2018 17:56:47 +0530 Subject: [PATCH] HIVE-20542: Incremental REPL DUMP progress information log message is incorrect (Ashutosh Bapat, reviewed by Sankar Hariappan) Signed-off-by: Sankar Hariappan --- .../listener/DbNotificationListener.java | 75 ++++++- .../listener/TestDbNotificationListener.java | 59 +++++ .../TestReplicationScenariosAcidTables.java | 6 +- .../hive/ql/parse/WarehouseInstance.java | 17 ++ .../hive/ql/exec/repl/ReplDumpTask.java | 9 +- .../hive/ql/metadata/events/EventUtils.java | 16 +- .../api/NotificationEventsCountRequest.java | 206 +++++++++++++++++- .../gen/thrift/gen-php/metastore/Types.php | 46 ++++ .../thrift/gen-py/hive_metastore/ttypes.py | 28 ++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 6 +- .../src/main/thrift/hive_metastore.thrift | 4 +- .../hadoop/hive/metastore/ObjectStore.java | 58 ++++- .../hadoop/hive/metastore/txn/TxnHandler.java | 2 +- 13 files changed, 503 insertions(+), 29 deletions(-) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index c23aab289749..fe101d304974 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -24,6 +24,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -939,19 +940,71 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE long nextNLId = getNextNLId(stmt, sqlGenerator, "org.apache.hadoop.hive.metastore.model.MNotificationLog"); - String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," + - quoteString(" ") + ",?, ?)"; + String insertVal; + String columns; + List params = new ArrayList(); + + // Construct the values string, parameters and column string step by step simultaneously so + // that the positions of columns and of their corresponding values do not go out of sync. + + // Notification log id + columns = "\"NL_ID\""; + insertVal = "" + nextNLId; + + // Event id + columns = columns + ", \"EVENT_ID\""; + insertVal = insertVal + "," + nextEventId; + + // Event time + columns = columns + ", \"EVENT_TIME\""; + insertVal = insertVal + "," + now(); + + // Event type + columns = columns + ", \"EVENT_TYPE\""; + insertVal = insertVal + ", ?"; + params.add(event.getEventType()); + + // Message + columns = columns + ", \"MESSAGE\""; + insertVal = insertVal + ", ?"; + params.add(event.getMessage()); + + // Message format + columns = columns + ", \"MESSAGE_FORMAT\""; + insertVal = insertVal + ", ?"; + params.add(event.getMessageFormat()); + + // Database name, optional + String dbName = event.getDbName(); + if (dbName != null) { + assert dbName.equals(dbName.toLowerCase()); + columns = columns + ", \"DB_NAME\""; + insertVal = insertVal + ", ?"; + params.add(dbName); + } - s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " + - " \"EVENT_TYPE\", \"DB_NAME\", " + - " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal; - List params = Arrays.asList( - event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat()); - pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + // Table name, optional + String tableName = event.getTableName(); + if (tableName != null) { + assert tableName.equals(tableName.toLowerCase()); + columns = columns + ", \"TBL_NAME\""; + insertVal = insertVal + ", ?"; + params.add(tableName); + } - LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", - quoteString(event.getEventType()), quoteString(event.getDbName()), - quoteString(event.getMessage()), quoteString(event.getMessageFormat())); + // Catalog name, optional + String catName = event.getCatName(); + if (catName != null) { + assert catName.equals(catName.toLowerCase()); + columns = columns + ", \"CAT_NAME\""; + insertVal = insertVal + ", ?"; + params.add(catName); + } + + s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute insert <" + s + "> with parameters (" + + String.join(", ", params) + ")"); pst.execute(); // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index dc555a4ad57a..3e404df7c388 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -300,6 +301,21 @@ public void tearDown() { MockMetaStoreEventListener.clearEvents(); } + // Test if the number of events between the given event ids and with the given database name are + // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0. + private void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit, + long expectedCount) throws Exception { + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + + if (toEventId != null) { + rqst.setToEventId(toEventId); + } + if (limit != null) { + rqst.setLimit(limit); + } + + assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount()); + } @Test public void createDatabase() throws Exception { @@ -341,6 +357,10 @@ public void createDatabase() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + + // There's only one event corresponding to CREATE DATABASE + testEventCounts(dbName, firstEventId, null, null, 1); + testEventCounts(dbName2, firstEventId, null, null, 0); } @Test @@ -358,6 +378,7 @@ public void dropDatabase() throws Exception { // Two events: one for create db and other for drop db assertEquals(2, rsp.getEventsSize()); + testEventCounts(dbName, firstEventId, null, null, 2); // Read event from notification NotificationEvent event = rsp.getEvents().get(1); @@ -388,6 +409,7 @@ public void dropDatabase() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(dbName2, firstEventId, null, null, 1); } @Test @@ -443,6 +465,7 @@ public void createTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 1); } @Test @@ -501,6 +524,7 @@ public void alterTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(2, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -567,6 +591,7 @@ public void dropTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -636,6 +661,7 @@ public void addPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(2, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -704,6 +730,7 @@ public void alterPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -778,6 +805,7 @@ public void dropPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(4, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 4); } @Test @@ -873,6 +901,7 @@ public void exchangePartition() throws Exception { MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(dbName, firstEventId, null, null, 5); } @Test @@ -931,6 +960,7 @@ public void createFunction() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 1); } @Test @@ -985,6 +1015,7 @@ public void dropFunction() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -1040,6 +1071,7 @@ public void insertTable() throws Exception { // Verify the eventID was passed to the non-transactional listener MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -1106,6 +1138,7 @@ public void insertPartition() throws Exception { MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @@ -1200,10 +1233,12 @@ public void sqlInsertTable() throws Exception { event = rsp.getEvents().get(5); assertEquals(firstEventId + 6, event.getEventId()); assertEquals(EventType.DROP_TABLE.toString(), event.getEventType()); + testEventCounts(defaultDbName, firstEventId, null, null, 6); } @Test public void sqlCTAS() throws Exception { + String defaultDbName = "default"; String sourceTblName = "sqlctasins1"; String targetTblName = "sqlctasins2"; // Event 1 @@ -1229,10 +1264,12 @@ public void sqlCTAS() throws Exception { event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType()); + testEventCounts(defaultDbName, firstEventId, null, null, 6); } @Test public void sqlTempTable() throws Exception { + String defaultDbName = "default"; String tempTblName = "sqltemptbl"; driver.run("create temporary table " + tempTblName + " (c int)"); driver.run("insert into table " + tempTblName + " values (1)"); @@ -1240,6 +1277,7 @@ public void sqlTempTable() throws Exception { // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(0, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 0); } @Test @@ -1263,6 +1301,7 @@ public void sqlDb() throws Exception { @Test public void sqlInsertPartition() throws Exception { + String defaultDbName = "default"; String tblName = "sqlinsptn"; // Event 1 driver.run("create table " + tblName + " (c int) partitioned by (ds string)"); @@ -1274,6 +1313,13 @@ public void sqlInsertPartition() throws Exception { driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')"); // Event 9, 10 driver.run("alter table " + tblName + " add partition (ds = 'yesterday')"); + + testEventCounts(defaultDbName, firstEventId, null, null, 10); + // Test a limit higher than available events + testEventCounts(defaultDbName, firstEventId, null, 100, 10); + // Test toEventId lower than current eventId + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5); + // Event 10, 11, 12 driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)"); // Event 12, 13, 14 @@ -1340,6 +1386,9 @@ public void sqlInsertPartition() throws Exception { assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + // Test fromEventId different from the very first + testEventCounts(defaultDbName, event.getEventId(), null, null, 3); + event = rsp.getEvents().get(21); assertEquals(firstEventId + 22, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); @@ -1355,6 +1404,16 @@ public void sqlInsertPartition() throws Exception { assertEquals(firstEventId + 24, event.getEventId()); assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + testEventCounts(defaultDbName, firstEventId, null, null, 24); + + // Test a limit within the available events + testEventCounts(defaultDbName, firstEventId, null, 10, 10); + // Test toEventId greater than current eventId + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24); + // Test toEventId greater than current eventId with some limit within available events + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10); + // Test toEventId greater than current eventId with some limit beyond available events + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24); } private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 4ceb9fa4caf5..af65d6a6a3e3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -482,7 +482,6 @@ public void testOpenTxnEvent() throws Throwable { .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - // create table will start and coomit the transaction primary.run("use " + primaryDbName) .run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + @@ -495,6 +494,11 @@ public void testOpenTxnEvent() throws Throwable { WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + + long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 20); + + // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index aae7bd7dfe64..7900779e7a56 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; @@ -416,6 +417,22 @@ ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { return new ReplicationV1CompatRule(client, hiveConf, testsToSkip); } + // Test if the number of events between the given event ids and with the given database name are + // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0. + public void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit, + long expectedCount) throws Exception { + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + + if (toEventId != null) { + rqst.setToEventId(toEventId); + } + if (limit != null) { + rqst.setLimit(limit); + } + + assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount()); + } + @Override public void close() throws IOException { if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index c75bde56ea6d..28d61f94dd7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -163,11 +163,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); lastReplId = work.eventTo; + + // Right now the only pattern allowed to be specified is *, which matches all the database + // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter + // on database name when it's *. In future, if we support more elaborate patterns, we will + // have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get + // correct event count. String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), - evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName)); + evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, + work.maxEventLimit())); replLogger.startLog(); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java index 66abd5152ebb..f9252712b3f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java @@ -36,7 +36,8 @@ public class EventUtils { public interface NotificationFetcher { int getBatchSize() throws IOException; long getCurrentNotificationEventId() throws IOException; - long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException; + long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId, + int limit) throws IOException; List getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; } @@ -78,10 +79,21 @@ public long getCurrentNotificationEventId() throws IOException { } @Override - public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException { + public long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId, + int limit) throws IOException { try { + // Number of events is always bounded by limit, which when non-positive, will result + // in no events being counted.. + if (limit <= 0) { + return 0; + } + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + if (toEventId != null) { + rqst.setToEventId(toEventId); + } + rqst.setLimit(limit); return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount(); } catch (TException e) { throw new IOException(e); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java index a4a5218f91b2..95af1a455b32 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java @@ -41,6 +41,8 @@ private static final org.apache.thrift.protocol.TField FROM_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fromEventId", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField TO_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("toEventId", org.apache.thrift.protocol.TType.I64, (short)4); + private static final org.apache.thrift.protocol.TField LIMIT_FIELD_DESC = new org.apache.thrift.protocol.TField("limit", org.apache.thrift.protocol.TType.I64, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -51,12 +53,16 @@ private long fromEventId; // required private String dbName; // required private String catName; // optional + private long toEventId; // optional + private long limit; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { FROM_EVENT_ID((short)1, "fromEventId"), DB_NAME((short)2, "dbName"), - CAT_NAME((short)3, "catName"); + CAT_NAME((short)3, "catName"), + TO_EVENT_ID((short)4, "toEventId"), + LIMIT((short)5, "limit"); private static final Map byName = new HashMap(); @@ -77,6 +83,10 @@ public static _Fields findByThriftId(int fieldId) { return DB_NAME; case 3: // CAT_NAME return CAT_NAME; + case 4: // TO_EVENT_ID + return TO_EVENT_ID; + case 5: // LIMIT + return LIMIT; default: return null; } @@ -118,8 +128,10 @@ public String getFieldName() { // isset id assignments private static final int __FROMEVENTID_ISSET_ID = 0; + private static final int __TOEVENTID_ISSET_ID = 1; + private static final int __LIMIT_ISSET_ID = 2; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.CAT_NAME}; + private static final _Fields optionals[] = {_Fields.CAT_NAME,_Fields.TO_EVENT_ID,_Fields.LIMIT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -129,6 +141,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.CAT_NAME, new org.apache.thrift.meta_data.FieldMetaData("catName", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TO_EVENT_ID, new org.apache.thrift.meta_data.FieldMetaData("toEventId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.LIMIT, new org.apache.thrift.meta_data.FieldMetaData("limit", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEventsCountRequest.class, metaDataMap); } @@ -158,6 +174,8 @@ public NotificationEventsCountRequest(NotificationEventsCountRequest other) { if (other.isSetCatName()) { this.catName = other.catName; } + this.toEventId = other.toEventId; + this.limit = other.limit; } public NotificationEventsCountRequest deepCopy() { @@ -170,6 +188,10 @@ public void clear() { this.fromEventId = 0; this.dbName = null; this.catName = null; + setToEventIdIsSet(false); + this.toEventId = 0; + setLimitIsSet(false); + this.limit = 0; } public long getFromEventId() { @@ -240,6 +262,50 @@ public void setCatNameIsSet(boolean value) { } } + public long getToEventId() { + return this.toEventId; + } + + public void setToEventId(long toEventId) { + this.toEventId = toEventId; + setToEventIdIsSet(true); + } + + public void unsetToEventId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TOEVENTID_ISSET_ID); + } + + /** Returns true if field toEventId is set (has been assigned a value) and false otherwise */ + public boolean isSetToEventId() { + return EncodingUtils.testBit(__isset_bitfield, __TOEVENTID_ISSET_ID); + } + + public void setToEventIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TOEVENTID_ISSET_ID, value); + } + + public long getLimit() { + return this.limit; + } + + public void setLimit(long limit) { + this.limit = limit; + setLimitIsSet(true); + } + + public void unsetLimit() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __LIMIT_ISSET_ID); + } + + /** Returns true if field limit is set (has been assigned a value) and false otherwise */ + public boolean isSetLimit() { + return EncodingUtils.testBit(__isset_bitfield, __LIMIT_ISSET_ID); + } + + public void setLimitIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __LIMIT_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case FROM_EVENT_ID: @@ -266,6 +332,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TO_EVENT_ID: + if (value == null) { + unsetToEventId(); + } else { + setToEventId((Long)value); + } + break; + + case LIMIT: + if (value == null) { + unsetLimit(); + } else { + setLimit((Long)value); + } + break; + } } @@ -280,6 +362,12 @@ public Object getFieldValue(_Fields field) { case CAT_NAME: return getCatName(); + case TO_EVENT_ID: + return getToEventId(); + + case LIMIT: + return getLimit(); + } throw new IllegalStateException(); } @@ -297,6 +385,10 @@ public boolean isSet(_Fields field) { return isSetDbName(); case CAT_NAME: return isSetCatName(); + case TO_EVENT_ID: + return isSetToEventId(); + case LIMIT: + return isSetLimit(); } throw new IllegalStateException(); } @@ -341,6 +433,24 @@ public boolean equals(NotificationEventsCountRequest that) { return false; } + boolean this_present_toEventId = true && this.isSetToEventId(); + boolean that_present_toEventId = true && that.isSetToEventId(); + if (this_present_toEventId || that_present_toEventId) { + if (!(this_present_toEventId && that_present_toEventId)) + return false; + if (this.toEventId != that.toEventId) + return false; + } + + boolean this_present_limit = true && this.isSetLimit(); + boolean that_present_limit = true && that.isSetLimit(); + if (this_present_limit || that_present_limit) { + if (!(this_present_limit && that_present_limit)) + return false; + if (this.limit != that.limit) + return false; + } + return true; } @@ -363,6 +473,16 @@ public int hashCode() { if (present_catName) list.add(catName); + boolean present_toEventId = true && (isSetToEventId()); + list.add(present_toEventId); + if (present_toEventId) + list.add(toEventId); + + boolean present_limit = true && (isSetLimit()); + list.add(present_limit); + if (present_limit) + list.add(limit); + return list.hashCode(); } @@ -404,6 +524,26 @@ public int compareTo(NotificationEventsCountRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetToEventId()).compareTo(other.isSetToEventId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetToEventId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.toEventId, other.toEventId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetLimit()).compareTo(other.isSetLimit()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetLimit()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.limit, other.limit); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -445,6 +585,18 @@ public String toString() { } first = false; } + if (isSetToEventId()) { + if (!first) sb.append(", "); + sb.append("toEventId:"); + sb.append(this.toEventId); + first = false; + } + if (isSetLimit()) { + if (!first) sb.append(", "); + sb.append("limit:"); + sb.append(this.limit); + first = false; + } sb.append(")"); return sb.toString(); } @@ -522,6 +674,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, NotificationEventsC org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // TO_EVENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.toEventId = iprot.readI64(); + struct.setToEventIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // LIMIT + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.limit = iprot.readI64(); + struct.setLimitIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -550,6 +718,16 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, NotificationEvents oprot.writeFieldEnd(); } } + if (struct.isSetToEventId()) { + oprot.writeFieldBegin(TO_EVENT_ID_FIELD_DESC); + oprot.writeI64(struct.toEventId); + oprot.writeFieldEnd(); + } + if (struct.isSetLimit()) { + oprot.writeFieldBegin(LIMIT_FIELD_DESC); + oprot.writeI64(struct.limit); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -573,10 +751,22 @@ public void write(org.apache.thrift.protocol.TProtocol prot, NotificationEventsC if (struct.isSetCatName()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetToEventId()) { + optionals.set(1); + } + if (struct.isSetLimit()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetCatName()) { oprot.writeString(struct.catName); } + if (struct.isSetToEventId()) { + oprot.writeI64(struct.toEventId); + } + if (struct.isSetLimit()) { + oprot.writeI64(struct.limit); + } } @Override @@ -586,11 +776,19 @@ public void read(org.apache.thrift.protocol.TProtocol prot, NotificationEventsCo struct.setFromEventIdIsSet(true); struct.dbName = iprot.readString(); struct.setDbNameIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.catName = iprot.readString(); struct.setCatNameIsSet(true); } + if (incoming.get(1)) { + struct.toEventId = iprot.readI64(); + struct.setToEventIdIsSet(true); + } + if (incoming.get(2)) { + struct.limit = iprot.readI64(); + struct.setLimitIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 775c9d97a9b3..5fd5d782ea7a 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -22619,6 +22619,14 @@ class NotificationEventsCountRequest { * @var string */ public $catName = null; + /** + * @var int + */ + public $toEventId = null; + /** + * @var int + */ + public $limit = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -22635,6 +22643,14 @@ public function __construct($vals=null) { 'var' => 'catName', 'type' => TType::STRING, ), + 4 => array( + 'var' => 'toEventId', + 'type' => TType::I64, + ), + 5 => array( + 'var' => 'limit', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -22647,6 +22663,12 @@ public function __construct($vals=null) { if (isset($vals['catName'])) { $this->catName = $vals['catName']; } + if (isset($vals['toEventId'])) { + $this->toEventId = $vals['toEventId']; + } + if (isset($vals['limit'])) { + $this->limit = $vals['limit']; + } } } @@ -22690,6 +22712,20 @@ public function read($input) $xfer += $input->skip($ftype); } break; + case 4: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->toEventId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 5: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->limit); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -22718,6 +22754,16 @@ public function write($output) { $xfer += $output->writeString($this->catName); $xfer += $output->writeFieldEnd(); } + if ($this->toEventId !== null) { + $xfer += $output->writeFieldBegin('toEventId', TType::I64, 4); + $xfer += $output->writeI64($this->toEventId); + $xfer += $output->writeFieldEnd(); + } + if ($this->limit !== null) { + $xfer += $output->writeFieldBegin('limit', TType::I64, 5); + $xfer += $output->writeI64($this->limit); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 9d3885cf2e99..03c2a4ee95a3 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15814,6 +15814,8 @@ class NotificationEventsCountRequest: - fromEventId - dbName - catName + - toEventId + - limit """ thrift_spec = ( @@ -15821,12 +15823,16 @@ class NotificationEventsCountRequest: (1, TType.I64, 'fromEventId', None, None, ), # 1 (2, TType.STRING, 'dbName', None, None, ), # 2 (3, TType.STRING, 'catName', None, None, ), # 3 + (4, TType.I64, 'toEventId', None, None, ), # 4 + (5, TType.I64, 'limit', None, None, ), # 5 ) - def __init__(self, fromEventId=None, dbName=None, catName=None,): + def __init__(self, fromEventId=None, dbName=None, catName=None, toEventId=None, limit=None,): self.fromEventId = fromEventId self.dbName = dbName self.catName = catName + self.toEventId = toEventId + self.limit = limit def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -15852,6 +15858,16 @@ def read(self, iprot): self.catName = iprot.readString() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.toEventId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.limit = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15874,6 +15890,14 @@ def write(self, oprot): oprot.writeFieldBegin('catName', TType.STRING, 3) oprot.writeString(self.catName) oprot.writeFieldEnd() + if self.toEventId is not None: + oprot.writeFieldBegin('toEventId', TType.I64, 4) + oprot.writeI64(self.toEventId) + oprot.writeFieldEnd() + if self.limit is not None: + oprot.writeFieldBegin('limit', TType.I64, 5) + oprot.writeI64(self.limit) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15890,6 +15914,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.fromEventId) value = (value * 31) ^ hash(self.dbName) value = (value * 31) ^ hash(self.catName) + value = (value * 31) ^ hash(self.toEventId) + value = (value * 31) ^ hash(self.limit) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 26b89c0fef33..2eea181d85e9 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3518,11 +3518,15 @@ class NotificationEventsCountRequest FROMEVENTID = 1 DBNAME = 2 CATNAME = 3 + TOEVENTID = 4 + LIMIT = 5 FIELDS = { FROMEVENTID => {:type => ::Thrift::Types::I64, :name => 'fromEventId'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, - CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true} + CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true}, + TOEVENTID => {:type => ::Thrift::Types::I64, :name => 'toEventId', :optional => true}, + LIMIT => {:type => ::Thrift::Types::I64, :name => 'limit', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index a2a6740452e4..4b7b61520a2d 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1147,7 +1147,9 @@ struct CurrentNotificationEventId { struct NotificationEventsCountRequest { 1: required i64 fromEventId, 2: required string dbName, - 3: optional string catName + 3: optional string catName, + 4: optional i64 toEventId, + 5: optional i64 limit } struct NotificationEventsCountResponse { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index ddd64e769019..9c158040497c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -10260,14 +10260,60 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv long fromEventId = rqst.getFromEventId(); String inputDbName = rqst.getDbName(); String catName = rqst.isSetCatName() ? rqst.getCatName() : getDefaultCatalog(conf); - String queryStr = "select count(eventId) from " + MNotificationLog.class.getName() - + " where eventId > fromEventId && dbName == inputDbName && catalogName == catName"; + long toEventId; + String paramSpecs; + List paramVals = new ArrayList(); + + // We store a catalog name in lower case in metastore and also use the same way everywhere in + // hive. + assert catName.equals(catName.toLowerCase()); + + // Build the query to count events, part by part + String queryStr = "select count(eventId) from " + MNotificationLog.class.getName(); + // count fromEventId onwards events + queryStr = queryStr + " where eventId > fromEventId"; + paramSpecs = "java.lang.Long fromEventId"; + paramVals.add(Long.valueOf(fromEventId)); + + // Input database name can be a database name or a *. In the first case we add a filter + // condition on dbName column, but not in the second case, since a * means all the + // databases. In case we support more elaborate database name patterns in future, we will + // have to apply a method similar to getNextNotification() method of MetaStoreClient. + if (!inputDbName.equals("*")) { + // dbName could be NULL in case of transaction related events, which also need to be + // counted. + queryStr = queryStr + " && (dbName == inputDbName || dbName == null)"; + paramSpecs = paramSpecs + ", java.lang.String inputDbName"; + // We store a database name in lower case in metastore. + paramVals.add(inputDbName.toLowerCase()); + } + + // catName could be NULL in case of transaction related events, which also need to be + // counted. + queryStr = queryStr + " && (catalogName == catName || catalogName == null)"; + paramSpecs = paramSpecs +", java.lang.String catName"; + paramVals.add(catName); + + // count events upto toEventId if specified + if (rqst.isSetToEventId()) { + toEventId = rqst.getToEventId(); + queryStr = queryStr + " && eventId <= toEventId"; + paramSpecs = paramSpecs + ", java.lang.Long toEventId"; + paramVals.add(Long.valueOf(toEventId)); + } + query = pm.newQuery(queryStr); - query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName," + - " java.lang.String catName"); - result = (Long) query.execute(fromEventId, inputDbName, catName); + query.declareParameters(paramSpecs); + result = (Long) query.executeWithArray(paramVals.toArray()); commited = commitTransaction(); - return new NotificationEventsCountResponse(result.longValue()); + + // Cap the event count by limit if specified. + long eventCount = result.longValue(); + if (rqst.isSetLimit() && eventCount > rqst.getLimit()) { + eventCount = rqst.getLimit(); + } + + return new NotificationEventsCountResponse(eventCount); } finally { rollbackAndCleanup(commited, query); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 351fafd933e0..0bb739fc2229 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1652,7 +1652,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ALLOC_WRITE_ID, - new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null), + new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null), dbConn, sqlGenerator); }