diff --git a/configs/pill/prox/pill_prox.dev.yml b/configs/pill/prox/pill_prox.dev.yml new file mode 100644 index 00000000..acbbcc75 --- /dev/null +++ b/configs/pill/prox/pill_prox.dev.yml @@ -0,0 +1,71 @@ +metrics_enabled: false +debug: true +graphite: + host: carbon.hostedgraphite.com + api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336 + reporting_interval_in_seconds: 15 + include_metrics: + - com.hello + +common_db: + # the name of your JDBC driver + driverClass: org.postgresql.Driver + + # the username + user: ingress_user + + # the password + password: hello ingress user + + # the JDBC URL + url: jdbc:postgresql://localhost:5432/dump_2015_02_09 + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 8 + + # the maximum number of connections to keep open + maxSize: 32 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + +kinesis: + endpoint : https://kinesis.us-east-1.amazonaws.com + streams : + batch_pill_data : dev_batch_pill_data + +app_name: PillProxConsumerLocal + +max_records: 1000 + +# Logging settings. +logging: + level: INFO + loggers: + com.hello.suripu.workers: DEBUG + com.hello.suripu.core: DEBUG + appenders: + - type: console + # The timezone used to format dates. HINT: USE THE DEFAULT, UTC. + timeZone: UTC + +dynamodb: + region: us-east-1 + tables: + features: features + pill_key_store : pill_key_store + endpoints: + features : https://dynamodb.us-east-1.amazonaws.com + pill_key_store : https://dynamodb.us-east-1.amazonaws.com + +prox_table_prefix: prefix diff --git a/src/main/java/com/hello/suripu/workers/HelloWorker.java b/src/main/java/com/hello/suripu/workers/HelloWorker.java index c11cb3b4..b0a11b46 100644 --- a/src/main/java/com/hello/suripu/workers/HelloWorker.java +++ b/src/main/java/com/hello/suripu/workers/HelloWorker.java @@ -8,6 +8,7 @@ import com.hello.suripu.workers.logs.timeline.TimelineLogCommand; import com.hello.suripu.workers.notifications.PushNotificationsWorkerCommand; import com.hello.suripu.workers.pill.PillWorkerCommand; +import com.hello.suripu.workers.pill.prox.PillProxWorkerCommand; import com.hello.suripu.workers.sense.SenseSaveWorkerCommand; import com.hello.suripu.workers.sense.lastSeen.SenseLastSeenWorkerCommand; import io.dropwizard.Application; @@ -39,6 +40,7 @@ public void initialize(Bootstrap bootstrap) { bootstrap.addCommand(new PushNotificationsWorkerCommand("push", "send push notifications")); bootstrap.addCommand(new TimelineLogCommand("timeline_log", "timeline log")); bootstrap.addCommand(new SenseStreamFanoutCommand("sense_stream_fanout", "fanout sense stream")); + bootstrap.addCommand(new PillProxWorkerCommand("prox", "pill prox")); } @Override diff --git a/src/main/java/com/hello/suripu/workers/framework/WorkerRolloutModule.java b/src/main/java/com/hello/suripu/workers/framework/WorkerRolloutModule.java index 7486a484..6d9e095c 100644 --- a/src/main/java/com/hello/suripu/workers/framework/WorkerRolloutModule.java +++ b/src/main/java/com/hello/suripu/workers/framework/WorkerRolloutModule.java @@ -12,6 +12,7 @@ import com.hello.suripu.workers.notifications.PushNotificationsProcessor; import com.hello.suripu.workers.pill.S3RecordProcessor; import com.hello.suripu.workers.pill.SavePillDataProcessor; +import com.hello.suripu.workers.pill.prox.PillProxDataProcessor; import com.hello.suripu.workers.sense.SenseSaveDDBProcessor; import com.hello.suripu.workers.sense.SenseSaveProcessor; import com.hello.suripu.workers.sense.lastSeen.SenseLastSeenProcessor; @@ -38,7 +39,8 @@ TimelineLogProcessor.class, SenseLastSeenProcessor.class, SenseStreamFanout.class, - SenseStructuredLogIndexer.class + SenseStructuredLogIndexer.class, + PillProxDataProcessor.class }) public class WorkerRolloutModule { private final FeatureStore featureStore; diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxData.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxData.java new file mode 100644 index 00000000..f1fe9921 --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxData.java @@ -0,0 +1,59 @@ +package com.hello.suripu.workers.pill.prox; + +import com.google.common.io.LittleEndianDataInputStream; +import com.hello.suripu.core.models.TrackerMotion; +import org.joda.time.DateTime; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +public class PillProxData { + + public final Long accountId; + public final String pillId; + public final Integer proxValue; + public final Integer proxValue4; + public final Integer reserved; + + public final DateTime ts; + public final Integer offsetMillis = 0; // TODO: LOL + + private PillProxData(final Long accountId, final String pillId, final Integer proxValue, final Integer proxValue4, final Integer reserved, final DateTime ts) { + this.accountId = accountId; + this.pillId = pillId; + this.proxValue = proxValue; + this.proxValue4 = proxValue4; + this.ts = ts; + this.reserved = reserved; + } + + public static PillProxData create(final Long accountId, final String pillId, final Integer proxValue, final Integer proxValue4, final DateTime ts) { + return new PillProxData(accountId, pillId, proxValue, proxValue4, 0, ts); + } + + public static PillProxData fromEncryptedData(final String pillId, final byte[] encryptedProxData, final byte[] key, final DateTime sampleTime) { + final byte[] decryptedProxData = decryptProxData(key, encryptedProxData); + try (final LittleEndianDataInputStream littleEndianDataInputStream = new LittleEndianDataInputStream(new ByteArrayInputStream(decryptedProxData))) { + final int proxValue = littleEndianDataInputStream.readInt(); + final int proxValue4 = littleEndianDataInputStream.readInt(); + final int reserved = littleEndianDataInputStream.readInt(); + return new PillProxData(0L, pillId, proxValue, proxValue4, reserved, sampleTime); + } catch (IOException e) { + throw new IllegalArgumentException("server can't parse prox data"); + } + + } + + public static byte[] decryptProxData(final byte[] key, final byte[] encryptedMotionData) { + final byte[] nonce = Arrays.copyOfRange(encryptedMotionData, 0, 8); + + //final byte[] crc = Arrays.copyOfRange(encryptedMotionData, encryptedMotionData.length - 1 - 2, encryptedMotionData.length); // Not used yet + final byte[] encryptedRawMotion = Arrays.copyOfRange(encryptedMotionData, 8, encryptedMotionData.length); + + final byte[] decryptedRawMotion = TrackerMotion.Utils.counterModeDecrypt(key, nonce, encryptedRawMotion); + return decryptedRawMotion; + } +} + + diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataDAODynamoDB.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataDAODynamoDB.java new file mode 100644 index 00000000..d2e5f62f --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataDAODynamoDB.java @@ -0,0 +1,463 @@ +package com.hello.suripu.workers.pill.prox; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.CreateTableResult; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.hello.suripu.core.db.TimeSeriesDAODynamoDB; +import com.hello.suripu.core.db.dynamo.Attribute; +import com.hello.suripu.core.db.dynamo.Expressions; +import com.hello.suripu.core.db.dynamo.expressions.Expression; +import com.hello.suripu.core.db.responses.Response; +import com.hello.suripu.core.util.DateTimeUtil; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Created by kingshy on 11/10/15. + * + * Similar to prod_sense_data_*, store pill data in monthly DDB tables. + * Schema: + * aid (HK, account_id, N) + * ts|pill (RK, ts|pill_external_id, S) + * prox (prox, N) + * om (offset_millis, N) + * lutcts (local_utc_ts, S) + * see https://hello.hackpad.com/Pill-Data-Explained-wgXcyalTFcq + */ + +public class PillProxDataDAODynamoDB extends TimeSeriesDAODynamoDB { + private final static Logger LOGGER = LoggerFactory.getLogger(PillProxDataDAODynamoDB.class); + + public PillProxDataDAODynamoDB(final AmazonDynamoDB dynamoDBClient, final String tablePrefix) { + super(dynamoDBClient, tablePrefix); + } + + public enum PillDataAttribute implements Attribute { + ACCOUNT_ID ("aid", "N"), + TS_PILL_ID ("ts|pil", "S"), + PROX ("prox", "N"), + PROX4 ("prox4", "N"), + OFFSET_MILLIS ("om", "N"), + LOCAL_UTC_TS ("lutcts", "S"); + + private final String name; + private final String type; + + PillDataAttribute(final String name, final String type) { + this.name = name; + this.type = type; + } + + private Long getLong(final Map item) { + if (item.containsKey(this.name)) { + return Long.parseLong(item.get(this.name).getN()); + } + return 0L; + } + + private Integer getInteger(final Map item) { + if (item.containsKey(this.name)) { + return Integer.parseInt(item.get(this.name).getN()); + } + return 0; + } + + public String sanitizedName() { + return toString(); + } + public String shortName() { + return name; + } + + public String type() { + return "S"; + } + } + + private static final Set TARGET_ATTRIBUTES = new ImmutableSet.Builder() + .add(PillDataAttribute.ACCOUNT_ID) + .add(PillDataAttribute.TS_PILL_ID) + .add(PillDataAttribute.PROX) + .add(PillDataAttribute.PROX4) + .add(PillDataAttribute.OFFSET_MILLIS) + .add(PillDataAttribute.LOCAL_UTC_TS) + .build(); + + + // Store one datapoint per minute, ts can contain seconds value + private static final DateTimeFormatter DATE_TIME_READ_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ"); + private static final String DATE_TIME_STRING_TEMPLATE = "yyyy-MM-dd HH:mm:ss"; + private static final DateTimeFormatter DATE_TIME_WRITE_FORMATTER = DateTimeFormat.forPattern(DATE_TIME_STRING_TEMPLATE); + + //region Override abstract methods + @Override + protected Logger logger() { + return LOGGER; + } + + @Override + protected Integer maxQueryAttempts() { + return 5; + } + + @Override + protected Integer maxBatchWriteAttempts() { + return 5; + } + + @Override + protected String hashKeyName() { + return PillDataAttribute.ACCOUNT_ID.name; + } + + @Override + protected String rangeKeyName() { + return PillDataAttribute.TS_PILL_ID.name; + } + + @Override + protected String hashKeyType() { + return PillDataAttribute.ACCOUNT_ID.type; + } + + @Override + protected String rangeKeyType() { + return PillDataAttribute.TS_PILL_ID.type; + } + + @Override + protected String getHashKey(AttributeValue attributeValue) { + return attributeValue.getN(); + } + + @Override + protected String getRangeKey(AttributeValue attributeValue) { + return attributeValue.getS(); + } + + @Override + protected DateTime getTimestamp(PillProxData pillProxData) { + return new DateTime(pillProxData.ts, DateTimeZone.UTC).withMillisOfSecond(0); + } + + @Override + protected Map toAttributeMap(PillProxData pillProxData) { + final Map item = Maps.newHashMap(); + item.put(PillDataAttribute.ACCOUNT_ID.name, new AttributeValue().withN(String.valueOf(pillProxData.accountId))); + item.put(PillDataAttribute.TS_PILL_ID.name, getRangeKey(pillProxData.ts, pillProxData.pillId)); + item.put(PillDataAttribute.PROX.name, new AttributeValue().withN(String.valueOf(pillProxData.proxValue))); + item.put(PillDataAttribute.PROX4.name, new AttributeValue().withN(String.valueOf(pillProxData.proxValue4))); + item.put(PillDataAttribute.OFFSET_MILLIS.name, new AttributeValue().withN(String.valueOf(pillProxData.offsetMillis))); + + final DateTime localUTCDateTIme = new DateTime(pillProxData.ts, DateTimeZone.UTC).plusMillis(pillProxData.offsetMillis); + item.put(PillDataAttribute.LOCAL_UTC_TS.name, new AttributeValue().withS(localUTCDateTIme.toString(DATE_TIME_WRITE_FORMATTER))); + + return item; + } + + @Override + public String getTableName(final DateTime dateTime) { + final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy_MM"); + return tablePrefix + "_" + dateTime.toString(formatter); + } + + @Override + public List getTableNames(DateTime start, DateTime end) { + final LinkedList names = Lists.newLinkedList(); + for (final DateTime dateTime: DateTimeUtil.dateTimesForStartOfMonthBetweenDates(start, end)) { + final String tableName = getTableName(dateTime); + names.add(tableName); + } + return names; + } + //endregion + + private static AttributeValue getRangeKey(final DateTime dateTime, final String pillId) { + return new AttributeValue(dateTime.toString(DATE_TIME_WRITE_FORMATTER) + "|" + pillId); + } + + private static AttributeValue getRangeKey(final Long timestamp, final String pillId) { + final DateTime dateTime = new DateTime(timestamp, DateTimeZone.UTC).withMillisOfSecond(0); + return getRangeKey(dateTime, pillId); + } + + private static AttributeValue toAttributeValue(final Long value) { + return new AttributeValue().withN(String.valueOf(value)); + } + + private static AttributeValue dateTimeToAttributeValue(final DateTime dateTime) { + return new AttributeValue(dateTime.toString(DATE_TIME_WRITE_FORMATTER)); + } + + private String externalTrackerIdFromDDBItem(final Map item) { + return item.get(PillDataAttribute.TS_PILL_ID.name).getS().substring(DATE_TIME_STRING_TEMPLATE.length() + 1); + } + + private DateTime timestampFromDDBItem(final Map item) { + final String dateString = item.get(PillDataAttribute.TS_PILL_ID.name).getS() + .substring(0, DATE_TIME_STRING_TEMPLATE.length()); + return DateTime.parse(dateString + "Z", DATE_TIME_READ_FORMATTER).withZone(DateTimeZone.UTC); + } + + + private PillProxData fromDynamoDBItem(final Map item) { + return PillProxData.create(88L, "test", 99, 999, DateTime.now()); + } + + + + /** + * Insert a list of any size to DDB, if size > 25, will be partitioned + * @param pillProxDataList list of pill data to insert + * @return number of successful inserts + */ + public int batchInsert(final List pillProxDataList, int batchSize) { + final List> dataList = Lists.partition(pillProxDataList, MAX_PUT_ITEMS); + int numberInserted = 0; + + for (final List trackerMotions : dataList) { + try { + numberInserted += batchInsert(trackerMotions); + } catch (AmazonClientException e) { + LOGGER.error("Got exception while attempting to batchInsert to DynamoDB: {}", e); + } + } + return numberInserted; + } + + /** + * Get a single datapoint (used for tests for now) + * @param accountId hash key + * @param externalPillId range key pill id + * @param queryDateTimeUTC range key ts + * @return list of tracker motion + */ + public Optional getSinglePillData(final Long accountId, + final String externalPillId, + final DateTime queryDateTimeUTC) { + // add two minutes for query time upper bound. + final Response>> response = getItemsBetweenTS(accountId, queryDateTimeUTC, queryDateTimeUTC.plusMinutes(2), Optional.of(externalPillId)); + + final ImmutableList items = ImmutableList.copyOf(attributeMapsToPillDataList(response.data)); + + if (items.isEmpty()) { + return Optional.absent(); + } + + return Optional.of(items.get(0)); + } + + final List attributeMapsToPillDataList(final List> items) { + final List dataList = Lists.newArrayListWithCapacity(items.size()); + for (final Map item : items) { + dataList.add(fromDynamoDBItem(item)); + } + return dataList; + } + + private Response getItemsBetweenTS(final long accountId, + final DateTime startTimestampUTC, + final DateTime endTimestampUTC, + final Optional optionalExternalPillId) { + + // aid = accountId, ts >= start, ts <= end + // with pill_id == "", there is no need to minus 1 minute from the endTimestamp + String externalPillId = ""; + DateTime queryEndTimestampUTC = endTimestampUTC; + if (optionalExternalPillId.isPresent()) { + externalPillId = optionalExternalPillId.get(); + queryEndTimestampUTC = endTimestampUTC.minusMinutes(1); + } + + final Expression keyConditionExpression = Expressions.and( + Expressions.equals(PillDataAttribute.ACCOUNT_ID, toAttributeValue(accountId)), + Expressions.between(PillDataAttribute.TS_PILL_ID, + getRangeKey(startTimestampUTC, externalPillId), + getRangeKey(queryEndTimestampUTC, externalPillId))); + + final List> results = Lists.newArrayList(); + final Response>> response = queryTables( + getTableNames(startTimestampUTC, queryEndTimestampUTC), + keyConditionExpression, + TARGET_ATTRIBUTES); + + for (final Map result : response.data) { + results.add(result); + } + return new Response(results, response.status, response.exception); + } + + private Response getItemsBetweenLocalTS(final long accountId, + final DateTime startLocalTime, + final DateTime endLocalTime, + final Optional optionalExternalPillId) { + // aid = accountId, lutcts >= startLocal, lutcts <= endLocal (note, inclusive) + final DateTime startTimestampUTC = startLocalTime.minusDays(1).minusMinutes(1); + final DateTime endTimestampUTC = endLocalTime.plusDays(1).plusMinutes(1); + + String externalPillId = ""; + if (optionalExternalPillId.isPresent()) { + externalPillId = optionalExternalPillId.get(); + } + + final Expression keyConditionExpression = Expressions.and( + Expressions.equals(PillDataAttribute.ACCOUNT_ID, toAttributeValue(accountId)), + Expressions.between(PillDataAttribute.TS_PILL_ID, + getRangeKey(startTimestampUTC, externalPillId), + getRangeKey(endTimestampUTC, externalPillId))); + + final Expression filterExpression = Expressions.between( + PillDataAttribute.LOCAL_UTC_TS, + dateTimeToAttributeValue(startLocalTime), + dateTimeToAttributeValue(endLocalTime)); + + final Response>> response = queryTables(getTableNames(startTimestampUTC, endTimestampUTC), + keyConditionExpression, + filterExpression, + TARGET_ATTRIBUTES); + + final List> results = Lists.newArrayList(); + for (final Map result : response.data) { + results.add(result); + } + return new Response(results, response.status, response.exception); + } + + //region query methods mirroring TrackerMotionDAO + public ImmutableList getBetween(final long accountId, + final DateTime startTimestampUTC, + final DateTime endTimestampUTC) { + final Response>> response = getItemsBetweenTS(accountId, startTimestampUTC, endTimestampUTC, Optional.absent()); + return ImmutableList.copyOf(attributeMapsToPillDataList(response.data)); + } + + public ImmutableList getBetween(final long accountId, + final DateTime startTimestampUTC, + final DateTime endTimestampUTC, + final String ExternalPillId) { + final Response>> response = getItemsBetweenTS(accountId, startTimestampUTC, endTimestampUTC, Optional.of(ExternalPillId)); + return ImmutableList.copyOf(attributeMapsToPillDataList(response.data)); + } + + public ImmutableList getBetweenLocalUTC(final long accountId, + final DateTime startLocalTime, + final DateTime endLocalTime) { + final Response>> response = getItemsBetweenLocalTS(accountId, startLocalTime, endLocalTime, Optional.absent()); + return ImmutableList.copyOf(attributeMapsToPillDataList(response.data)); + } + + public Integer getDataCountBetweenLocalUTC(final long accountId, + final DateTime startLocalTime, + final DateTime endLocalTime) { + final Response>> response = getItemsBetweenLocalTS(accountId, startLocalTime, endLocalTime, Optional.absent()); + return response.data.size(); + } + + //region TODO + public ImmutableList getBetweenGrouped(final long accountId, + final DateTime startLocalTime, + final DateTime endLocalTime, + final Integer slotDuration) { + return ImmutableList.of(); + } + + // used by TrackerMotionDAOIT. not implementing. + public Integer deleteDataTrackerID(final Long trackerId) { + return 0; + } + + public ImmutableList getTrackerOffsetMillis(final long accountId, + final DateTime startDate, + final DateTime endDate) { + return ImmutableList.of(); + } + //endregion TODO + + //endregion + + public Optional getMostRecent(final String externalPillId, final Long accountId, final DateTime now) { + final DateTime startTime = now.minusDays(15); + final Expression keyConditionExpression = Expressions.and( + Expressions.equals(PillDataAttribute.ACCOUNT_ID, toAttributeValue(accountId)), + Expressions.between(PillDataAttribute.TS_PILL_ID, getRangeKey(startTime, externalPillId), getRangeKey(now, externalPillId))); + + final Optional> latest = getLatest(getTableName(now), keyConditionExpression, TARGET_ATTRIBUTES); + + if (latest.isPresent()) { + final PillProxData pillProxData = fromDynamoDBItem(latest.get()); + if (pillProxData.pillId.equals(externalPillId)) { + return Optional.of(pillProxData); + } + } + + // Getting the absolute most recent didn't work, so try querying relevant tables. + final Response>> response = queryTables(getTableNames(startTime, now), keyConditionExpression, TARGET_ATTRIBUTES); + + // Iterate through results in reverse order (most recent first) + for (final Map item: Lists.reverse(response.data)) { + final PillProxData pillProxData = fromDynamoDBItem(item); + if (pillProxData.pillId.equals(externalPillId)) { + return Optional.of(pillProxData); + } + } + + return Optional.absent(); + } + + + public Class name() { + return PillProxDataDAODynamoDB.class; + } + + public CreateTableResult createTable(final String tableName) { + if (!tableName.startsWith(this.tablePrefix)) { + final String status = String.format("Fail to create %s, wrong prefix!", tableName); + return new CreateTableResult().withTableDescription( + new TableDescription().withTableStatus(status)); + } + + final CreateTableRequest request = new CreateTableRequest().withTableName(tableName); + + request.withKeySchema( + new KeySchemaElement().withAttributeName(PillDataAttribute.ACCOUNT_ID.name).withKeyType(KeyType.HASH), + new KeySchemaElement().withAttributeName(PillDataAttribute.TS_PILL_ID.name).withKeyType(KeyType.RANGE) + ); + + request.withAttributeDefinitions( + new AttributeDefinition() + .withAttributeName(PillDataAttribute.ACCOUNT_ID.name) + .withAttributeType(PillDataAttribute.ACCOUNT_ID.type), + new AttributeDefinition() + .withAttributeName(PillDataAttribute.TS_PILL_ID.name) + .withAttributeType(PillDataAttribute.TS_PILL_ID.type) + ); + + request.setProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(1L) + .withWriteCapacityUnits(1L)); + + return dynamoDBClient.createTable(request); + } + +} diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessor.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessor.java new file mode 100644 index 00000000..4c548566 --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessor.java @@ -0,0 +1,140 @@ +package com.hello.suripu.workers.pill.prox; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.model.Record; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; +import com.hello.suripu.api.ble.SenseCommandProtos; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.KeyStore; +import com.hello.suripu.workers.framework.HelloBaseRecordProcessor; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PillProxDataProcessor extends HelloBaseRecordProcessor { + private final static Logger LOGGER = LoggerFactory.getLogger(PillProxDataProcessor.class); + private final PillProxDataDAODynamoDB pillProxDataDAODynamoDB; + private final int batchSize; + private final KeyStore pillKeyStore; + private final DeviceDAO deviceDAO; + + public PillProxDataProcessor(final PillProxDataDAODynamoDB pillProxDataDAODynamoDB, + final int batchSize, + final KeyStore pillKeyStore, + final DeviceDAO deviceDAO) { + this.batchSize = batchSize; + this.pillKeyStore = pillKeyStore; + this.pillProxDataDAODynamoDB = pillProxDataDAODynamoDB; + this.deviceDAO = deviceDAO; + } + + @Override + public void initialize(String s) { + } + + @Override + public void processRecords(final List records, final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { + + final Map> pillKeys = Maps.newHashMap(); + final Set pillIds = Sets.newHashSet(); + final List proxData = Lists.newArrayList(); + final List pillProxDataList = Lists.newArrayList(); + + for (final Record record : records) { + try { + final SenseCommandProtos.batched_pill_data batched_pill_data = SenseCommandProtos.batched_pill_data.parseFrom(record.getData().array()); + for(final SenseCommandProtos.pill_data data: batched_pill_data.getProxList()) { + pillIds.add(data.getDeviceId()); + proxData.add(data); + } + if(!batched_pill_data.getProxList().isEmpty()) { + LOGGER.info("sense_id={} action=has-motion", batched_pill_data.getDeviceId()); + } + + } catch (InvalidProtocolBufferException e) { + LOGGER.error("Failed to decode protobuf: {}", e.getMessage()); + } catch (IllegalArgumentException e) { + LOGGER.error("Failed to decrypted pill data {}, error: {}", record.getData().array(), e.getMessage()); + } + } + try { + // Fetch data from Dynamo and DB + if(pillIds.isEmpty()) { +// LOGGER.warn("No valid prox pills"); + return; + } + final Map> keys = pillKeyStore.getBatch(pillIds); + if(keys.isEmpty()) { + LOGGER.error("Failed to retrieve decryption keys. Can't proceed. Bailing"); + System.exit(1); + } + pillKeys.putAll(keys); + // The key should not be null + for (final SenseCommandProtos.pill_data proxPacket : proxData) { + final Optional decryptionKey = pillKeys.get(proxPacket.getDeviceId()); + if (!decryptionKey.isPresent()) { + LOGGER.error("Missing decryption key for pill: {}", proxPacket.getDeviceId()); + continue; + } + if (proxPacket.hasMotionDataEntrypted()) { + LOGGER.info("pill_id={} action=has-motion", proxPacket.getDeviceId()); + final Optional key = keys.get(proxPacket.getDeviceId()); + + final long ts = proxPacket.getTimestamp() * 1000L; + if (key.isPresent()) { + final PillProxData pillProxData = PillProxData.fromEncryptedData( + proxPacket.getDeviceId(), + proxPacket.getMotionDataEntrypted().toByteArray(), + key.get(), + new DateTime(ts, DateTimeZone.UTC) + ); + pillProxDataList.add(pillProxData); + } + } + } + } catch (Exception e) { + LOGGER.error("Failed processing pill: {}", e.getMessage()); + LOGGER.error("Failed processing pill: {}", e); + } + if(!pillProxDataList.isEmpty()) { + final int proxInsertedCount = pillProxDataDAODynamoDB.batchInsertAllPartitions(pillProxDataList); + LOGGER.info("action=insert-prox-data count={}", proxInsertedCount); + } + if(!pillProxDataList.isEmpty()) { + try { + iRecordProcessorCheckpointer.checkpoint(); + LOGGER.info("Successful checkpoint."); + } catch (InvalidStateException e) { + LOGGER.error("checkpoint {}", e.getMessage()); + } catch (ShutdownException e) { + LOGGER.error("Received shutdown command at checkpoint, bailing. {}", e.getMessage()); + } + } + } + + @Override + public void shutdown(final IRecordProcessorCheckpointer iRecordProcessorCheckpointer, final ShutdownReason shutdownReason) { + LOGGER.warn("SHUTDOWN: {}", shutdownReason.toString()); + if(shutdownReason == ShutdownReason.TERMINATE) { + LOGGER.warn("Got Terminate. Attempting to checkpoint."); + try { + iRecordProcessorCheckpointer.checkpoint(); + LOGGER.warn("Checkpoint successful."); + } catch (InvalidStateException | ShutdownException e) { + LOGGER.error(e.getMessage()); + } + } + } +} diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessorFactory.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessorFactory.java new file mode 100644 index 00000000..d4867be6 --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxDataProcessorFactory.java @@ -0,0 +1,34 @@ +package com.hello.suripu.workers.pill.prox; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.hello.suripu.core.configuration.DynamoDBTableName; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.KeyStore; +import com.hello.suripu.core.db.KeyStoreDynamoDB; +import org.skife.jdbi.v2.DBI; + +public class PillProxDataProcessorFactory implements IRecordProcessorFactory { + + private final PillProxWorkerConfiguration configuration; + private final DBI dbi; + + + public PillProxDataProcessorFactory(final PillProxWorkerConfiguration configuration, final DBI dbi) { + this.configuration = configuration; + this.dbi = dbi; + } + + @Override + public IRecordProcessor createProcessor() { + final DeviceDAO deviceDAO = dbi.onDemand(DeviceDAO.class); + final AmazonDynamoDB ddbClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); + final PillProxDataDAODynamoDB pillProxDataDAODynamoDB = new PillProxDataDAODynamoDB(ddbClient, configuration.proxTablePrefix()); + final KeyStore pillKeyStore = new KeyStoreDynamoDB(ddbClient,configuration.dynamoDBConfiguration().tables().get(DynamoDBTableName.PILL_KEY_STORE), new byte[16], 120); + + return new PillProxDataProcessor(pillProxDataDAODynamoDB, configuration.getMaxRecords(), pillKeyStore, deviceDAO); + } +} diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerCommand.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerCommand.java new file mode 100644 index 00000000..7f16bcf3 --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerCommand.java @@ -0,0 +1,131 @@ +package com.hello.suripu.workers.pill.prox; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.hello.suripu.core.ObjectGraphRoot; +import com.hello.suripu.core.configuration.DynamoDBTableName; +import com.hello.suripu.core.configuration.QueueName; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.FeatureStore; +import com.hello.suripu.core.db.PillHeartBeatDAO; +import com.hello.suripu.core.db.util.JodaArgumentFactory; +import com.hello.suripu.coredw8.clients.AmazonDynamoDBClientFactory; +import com.hello.suripu.coredw8.metrics.RegexMetricFilter; +import com.hello.suripu.workers.framework.WorkerEnvironmentCommand; +import com.hello.suripu.workers.framework.WorkerRolloutModule; +import io.dropwizard.jdbi.DBIFactory; +import io.dropwizard.setup.Environment; +import net.sourceforge.argparse4j.inf.Namespace; +import org.skife.jdbi.v2.DBI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +public final class PillProxWorkerCommand extends WorkerEnvironmentCommand { + + private final static Logger LOGGER = LoggerFactory.getLogger(PillProxWorkerCommand.class); + + private boolean useDynamoPillData = false; + + public PillProxWorkerCommand(String name, String description) { + super(name, description); + } + + public PillProxWorkerCommand(String name, String description, final boolean useDynamoPillData) { + this(name, description); + this.useDynamoPillData = useDynamoPillData; + } + + @Override + protected void run(Environment environment, Namespace namespace, PillProxWorkerConfiguration configuration) throws Exception { + final DBIFactory factory = new DBIFactory(); + final DBI commonDBI = factory.build(environment, configuration.getCommonDB(), "postgresql-common"); + + // Joda Argument factory is not supported by default by DW, needs to be added manually + commonDBI.registerArgumentFactory(new JodaArgumentFactory()); + + final DeviceDAO deviceDAO = commonDBI.onDemand(DeviceDAO.class); + final PillHeartBeatDAO heartBeatDAO = commonDBI.onDemand(PillHeartBeatDAO.class); + + final ImmutableMap queueNames = configuration.getQueues(); + + LOGGER.debug("{}", queueNames); + final String queueName = queueNames.get(QueueName.BATCH_PILL_DATA); + LOGGER.info("\n\n\n!!! This worker is using the following queue: {} !!!\n\n\n", queueName); + + + + if(configuration.getMetricsEnabled()) { + final String graphiteHostName = configuration.getGraphite().getHost(); + final String apiKey = configuration.getGraphite().getApiKey(); + final Integer interval = configuration.getGraphite().getReportingIntervalInSeconds(); + + final String env = (configuration.isDebug()) ? "dev" : "prod"; + final String prefix = String.format("%s.%s.suripu-workers-pill", apiKey, env); + + final ImmutableList metrics = ImmutableList.copyOf(configuration.getGraphite().getIncludeMetrics()); + final RegexMetricFilter metricFilter = new RegexMetricFilter(metrics); + + final Graphite graphite = new Graphite(new InetSocketAddress(graphiteHostName, 2003)); + + final GraphiteReporter reporter = GraphiteReporter.forRegistry(environment.metrics()) + .prefixedWith(prefix) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .filter(metricFilter) + .build(graphite); + reporter.start(interval, TimeUnit.SECONDS); + + LOGGER.info("Metrics enabled."); + } else { + LOGGER.warn("Metrics not enabled."); + } + + final AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain(); + final String workerId = InetAddress.getLocalHost().getCanonicalHostName(); + String kinesisAppName = configuration.getAppName(); + final KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration( + kinesisAppName, + queueName, + awsCredentialsProvider, + workerId); + kinesisConfig.withMaxRecords(configuration.getMaxRecords()); + kinesisConfig.withKinesisEndpoint(configuration.getKinesisEndpoint()); + kinesisConfig.withInitialPositionInStream(InitialPositionInStream.LATEST); + + if(configuration.isDebug()) { + kinesisConfig.withMetricsLevel(MetricsLevel.NONE); + } + + final ImmutableMap tableNames = configuration.dynamoDBConfiguration().tables(); + final AmazonDynamoDBClientFactory amazonDynamoDBClientFactory = AmazonDynamoDBClientFactory.create(awsCredentialsProvider, configuration.dynamoDBConfiguration()); + + final AmazonDynamoDB featureDynamoDB = amazonDynamoDBClientFactory.getForTable(DynamoDBTableName.FEATURES); + final String featureNamespace = (configuration.isDebug()) ? "dev" : "prod"; + final FeatureStore featureStore = new FeatureStore(featureDynamoDB, tableNames.get(DynamoDBTableName.FEATURES), featureNamespace); + + final WorkerRolloutModule workerRolloutModule = new WorkerRolloutModule(featureStore, 30); + ObjectGraphRoot.getInstance().init(workerRolloutModule); + + final IRecordProcessorFactory processorFactory = new PillProxDataProcessorFactory( + configuration, + commonDBI + ); + + final Worker worker = new Worker(processorFactory, kinesisConfig); + worker.run(); + } +} diff --git a/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerConfiguration.java b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerConfiguration.java new file mode 100644 index 00000000..1b62e7d2 --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/pill/prox/PillProxWorkerConfiguration.java @@ -0,0 +1,37 @@ +package com.hello.suripu.workers.pill.prox; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.hello.suripu.coredw8.configuration.NewDynamoDBConfiguration; +import com.hello.suripu.workers.framework.WorkerConfiguration; +import io.dropwizard.db.DataSourceFactory; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +public class PillProxWorkerConfiguration extends WorkerConfiguration { + @JsonProperty("max_records") + private Integer maxRecords = 200; + public Integer getMaxRecords() { + return maxRecords; + } + @Valid + @NotNull + @JsonProperty("common_db") + private DataSourceFactory commonDB = new DataSourceFactory(); + public DataSourceFactory getCommonDB() { + return commonDB; + } + + @Valid + @NotNull + @JsonProperty("dynamodb") + private NewDynamoDBConfiguration dynamoDBConfiguration; + public NewDynamoDBConfiguration dynamoDBConfiguration(){ + return dynamoDBConfiguration; + } + @Valid + @NotNull + @JsonProperty("prox_table_prefix") + private String proxTablePrefix; + public String proxTablePrefix() { return proxTablePrefix;} +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 00000000..f93c23c4 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + \ No newline at end of file