Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating prox worker #63

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions configs/pill/prox/pill_prox.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
metrics_enabled: false
debug: true
graphite:
host: carbon.hostedgraphite.com
api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336
reporting_interval_in_seconds: 15
include_metrics:
- com.hello

common_db:
# the name of your JDBC driver
driverClass: org.postgresql.Driver

# the username
user: ingress_user

# the password
password: hello ingress user

# the JDBC URL
url: jdbc:postgresql://localhost:5432/dump_2015_02_09

# any properties specific to your JDBC driver:
properties:
charSet: UTF-8

# the maximum amount of time to wait on an empty pool before throwing an exception
maxWaitForConnection: 1s

# the SQL query to run when validating a connection's liveness
validationQuery: "/* MyService Health Check */ SELECT 1"

# the minimum number of connections to keep open
minSize: 8

# the maximum number of connections to keep open
maxSize: 32

# whether or not idle connections should be validated
checkConnectionWhileIdle: false

kinesis:
endpoint : https://kinesis.us-east-1.amazonaws.com
streams :
batch_pill_data : dev_batch_pill_data

app_name: PillProxConsumerLocal

max_records: 1000

# Logging settings.
logging:
level: INFO
loggers:
com.hello.suripu.workers: DEBUG
com.hello.suripu.core: DEBUG
appenders:
- type: console
# The timezone used to format dates. HINT: USE THE DEFAULT, UTC.
timeZone: UTC

dynamodb:
region: us-east-1
tables:
features: features
pill_key_store : pill_key_store
endpoints:
features : https://dynamodb.us-east-1.amazonaws.com
pill_key_store : https://dynamodb.us-east-1.amazonaws.com

prox_table_prefix: prefix
2 changes: 2 additions & 0 deletions src/main/java/com/hello/suripu/workers/HelloWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.hello.suripu.workers.logs.timeline.TimelineLogCommand;
import com.hello.suripu.workers.notifications.PushNotificationsWorkerCommand;
import com.hello.suripu.workers.pill.PillWorkerCommand;
import com.hello.suripu.workers.pill.prox.PillProxWorkerCommand;
import com.hello.suripu.workers.sense.SenseSaveWorkerCommand;
import com.hello.suripu.workers.sense.lastSeen.SenseLastSeenWorkerCommand;
import io.dropwizard.Application;
Expand Down Expand Up @@ -39,6 +40,7 @@ public void initialize(Bootstrap<WorkerConfiguration> bootstrap) {
bootstrap.addCommand(new PushNotificationsWorkerCommand("push", "send push notifications"));
bootstrap.addCommand(new TimelineLogCommand("timeline_log", "timeline log"));
bootstrap.addCommand(new SenseStreamFanoutCommand("sense_stream_fanout", "fanout sense stream"));
bootstrap.addCommand(new PillProxWorkerCommand("prox", "pill prox"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.hello.suripu.workers.notifications.PushNotificationsProcessor;
import com.hello.suripu.workers.pill.S3RecordProcessor;
import com.hello.suripu.workers.pill.SavePillDataProcessor;
import com.hello.suripu.workers.pill.prox.PillProxDataProcessor;
import com.hello.suripu.workers.sense.SenseSaveDDBProcessor;
import com.hello.suripu.workers.sense.SenseSaveProcessor;
import com.hello.suripu.workers.sense.lastSeen.SenseLastSeenProcessor;
Expand All @@ -38,7 +39,8 @@
TimelineLogProcessor.class,
SenseLastSeenProcessor.class,
SenseStreamFanout.class,
SenseStructuredLogIndexer.class
SenseStructuredLogIndexer.class,
PillProxDataProcessor.class
})
public class WorkerRolloutModule {
private final FeatureStore featureStore;
Expand Down
59 changes: 59 additions & 0 deletions src/main/java/com/hello/suripu/workers/pill/prox/PillProxData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.hello.suripu.workers.pill.prox;

import com.google.common.io.LittleEndianDataInputStream;
import com.hello.suripu.core.models.TrackerMotion;
import org.joda.time.DateTime;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;

public class PillProxData {

public final Long accountId;
public final String pillId;
public final Integer proxValue;
public final Integer proxValue4;
public final Integer reserved;

public final DateTime ts;
public final Integer offsetMillis = 0; // TODO: LOL

private PillProxData(final Long accountId, final String pillId, final Integer proxValue, final Integer proxValue4, final Integer reserved, final DateTime ts) {
this.accountId = accountId;
this.pillId = pillId;
this.proxValue = proxValue;
this.proxValue4 = proxValue4;
this.ts = ts;
this.reserved = reserved;
}

public static PillProxData create(final Long accountId, final String pillId, final Integer proxValue, final Integer proxValue4, final DateTime ts) {
return new PillProxData(accountId, pillId, proxValue, proxValue4, 0, ts);
}

public static PillProxData fromEncryptedData(final String pillId, final byte[] encryptedProxData, final byte[] key, final DateTime sampleTime) {
final byte[] decryptedProxData = decryptProxData(key, encryptedProxData);
try (final LittleEndianDataInputStream littleEndianDataInputStream = new LittleEndianDataInputStream(new ByteArrayInputStream(decryptedProxData))) {
final int proxValue = littleEndianDataInputStream.readInt();
final int proxValue4 = littleEndianDataInputStream.readInt();
final int reserved = littleEndianDataInputStream.readInt();
return new PillProxData(0L, pillId, proxValue, proxValue4, reserved, sampleTime);
} catch (IOException e) {
throw new IllegalArgumentException("server can't parse prox data");
}

}

public static byte[] decryptProxData(final byte[] key, final byte[] encryptedMotionData) {
final byte[] nonce = Arrays.copyOfRange(encryptedMotionData, 0, 8);

//final byte[] crc = Arrays.copyOfRange(encryptedMotionData, encryptedMotionData.length - 1 - 2, encryptedMotionData.length); // Not used yet
final byte[] encryptedRawMotion = Arrays.copyOfRange(encryptedMotionData, 8, encryptedMotionData.length);

final byte[] decryptedRawMotion = TrackerMotion.Utils.counterModeDecrypt(key, nonce, encryptedRawMotion);
return decryptedRawMotion;
}
}


Loading