Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

Event distributor static taps #53

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 123 additions & 53 deletions src/main/java/com/proofpoint/event/collector/EventTapWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
package com.proofpoint.event.collector;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
import com.google.common.collect.Table.Cell;
import com.proofpoint.discovery.client.ServiceDescriptor;
import com.proofpoint.discovery.client.ServiceSelector;
import com.proofpoint.discovery.client.ServiceType;
import com.proofpoint.event.collector.EventTapWriter.EventTypePolicy.FlowPolicy;
import com.proofpoint.event.collector.EventTapWriter.FlowInfo.Builder;
import com.proofpoint.event.collector.StaticEventTapConfig.FlowKey;
import com.proofpoint.log.Logger;
import com.proofpoint.units.Duration;
import org.weakref.jmx.Managed;
Expand All @@ -31,7 +39,6 @@
import javax.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -44,12 +51,24 @@
import static com.google.common.base.Objects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.proofpoint.event.collector.QosDelivery.BEST_EFFORT;
import static com.proofpoint.event.collector.QosDelivery.RETRY;
import static java.lang.String.format;

public class EventTapWriter implements EventWriter
{
@VisibleForTesting
static final String FLOW_ID_PROPERTY_NAME = "flowId";

@VisibleForTesting
static final String HTTP_PROPERTY_NAME = "http";

@VisibleForTesting
static final String EVENT_TYPE_PROPERTY_NAME = "eventType";

private static final String HTTPS_PROPERTY_NAME = "https";
private static final String QOS_DELIVERY_PROPERTY_NAME = "qos.delivery";

private static final Logger log = Logger.get(EventTapWriter.class);
private static final EventTypePolicy NULL_EVENT_TYPE_POLICY = new EventTypePolicy.Builder().build();
private final ServiceSelector selector;
Expand All @@ -60,7 +79,8 @@ public class EventTapWriter implements EventWriter

private final AtomicReference<Map<String, EventTypePolicy>> eventTypePolicies = new AtomicReference<Map<String, EventTypePolicy>>(
ImmutableMap.<String, EventTypePolicy>of());
private Map<String, Map<String, FlowInfo>> flows = ImmutableMap.of();
private final List<TapSpec> staticTapSpecs;
private Table<String, String, FlowInfo> flows = ImmutableTable.of();

private ScheduledFuture<?> refreshJob;
private final Duration flowRefreshDuration;
Expand All @@ -70,8 +90,10 @@ public EventTapWriter(@ServiceType("eventTap") ServiceSelector selector,
@EventTap ScheduledExecutorService executorService,
BatchProcessorFactory batchProcessorFactory,
EventTapFlowFactory eventTapFlowFactory,
EventTapConfig config)
EventTapConfig config,
StaticEventTapConfig staticEventTapConfig)
{
this.staticTapSpecs = createTapSpecFromConfig(checkNotNull(staticEventTapConfig, "staticEventTapConfig is null"));
this.selector = checkNotNull(selector, "selector is null");
this.executorService = checkNotNull(executorService, "executorService is null");
this.flowRefreshDuration = checkNotNull(config, "config is null").getEventTapRefreshDuration();
Expand Down Expand Up @@ -125,20 +147,18 @@ public void refreshFlows()
{
try {
Map<String, EventTypePolicy> existingPolicies = eventTypePolicies.get();
Map<String, Map<String, FlowInfo>> existingFlows = flows;
Table<String, String, FlowInfo> existingFlows = flows;
ImmutableMap.Builder<String, EventTypePolicy> policiesBuilder = ImmutableMap.builder();

Map<String, Map<String, FlowInfo>> newFlows = constructFlowInfoFromDiscovery();
Table<String, String, FlowInfo> newFlows = constructFlowInfoFromTapSpec(Iterables.concat(staticTapSpecs, createTapSpecFromDiscovery(selector.selectAllServices())));
if (existingFlows.equals(newFlows)) {
return;
}

for (Map.Entry<String, Map<String, FlowInfo>> flowsForEventTypeEntry : newFlows.entrySet()) {
String eventType = flowsForEventTypeEntry.getKey();
Map<String, FlowInfo> flowsForEventType = flowsForEventTypeEntry.getValue();
for (Map.Entry<String, Map<String, FlowInfo>> entry : newFlows.rowMap().entrySet()) {
String eventType = entry.getKey();
EventTypePolicy existingPolicy = firstNonNull(existingPolicies.get(eventType), NULL_EVENT_TYPE_POLICY);

policiesBuilder.put(eventType, constructPolicyForFlows(existingPolicy, eventType, flowsForEventType));
policiesBuilder.put(eventType, constructPolicyForFlows(existingPolicy, eventType, entry.getValue()));
}

Map<String, EventTypePolicy> newPolicies = policiesBuilder.build();
Expand All @@ -150,7 +170,6 @@ public void refreshFlows()
catch (Exception e) {
log.error(e, "Couldn't refresh flows");
}

}

@Override
Expand All @@ -168,65 +187,39 @@ public void distribute(Event event)
write(event);
}

private Map<String, Map<String, FlowInfo>> constructFlowInfoFromDiscovery()
private Table<String, String, FlowInfo> constructFlowInfoFromTapSpec(Iterable<TapSpec> tapSpecs)
{
List<ServiceDescriptor> descriptors = selector.selectAllServices();
// First level is EventType, second is flowId
Map<String, Map<String, FlowInfo.Builder>> flows = new HashMap<>();

for (ServiceDescriptor descriptor : descriptors) {
Map<String, String> properties = descriptor.getProperties();
String eventType = properties.get("eventType");
String flowId = properties.get(FLOW_ID_PROPERTY_NAME);
Table<String, String, FlowInfo.Builder> flows = HashBasedTable.create();

URI uri = safeUriFromString(properties.get("https"));
if (uri == null && allowHttpConsumers) {
uri = safeUriFromString(properties.get("http"));
}
for (TapSpec tapSpec : tapSpecs) {
String eventType = tapSpec.getEventType();
String flowId = tapSpec.getFlowId();

String qosDelivery = properties.get("qos.delivery");
URI uri = tapSpec.getUri();

if (isNullOrEmpty(eventType) || isNullOrEmpty(flowId) || uri == null) {
continue;
}

Map<String, FlowInfo.Builder> flowsForEventType = flows.get(eventType);
if (flowsForEventType == null) {
flowsForEventType = new HashMap<>();
flows.put(eventType, flowsForEventType);
}

FlowInfo.Builder flowBuilder = flowsForEventType.get(flowId);
FlowInfo.Builder flowBuilder = flows.get(eventType, flowId);
if (flowBuilder == null) {
flowBuilder = FlowInfo.builder();
flowsForEventType.put(flowId, flowBuilder);
flows.put(eventType, flowId, flowBuilder);
}

if ("retry".equalsIgnoreCase(qosDelivery)) {
QosDelivery qosDelivery = tapSpec.getQosDelivery();
if (RETRY.equals(qosDelivery)) {
flowBuilder.setQosEnabled(true);
}

flowBuilder.addDestination(uri);
}

return constructFlowsFromBuilderMap(flows);
return constructFlowsFromTable(flows);
}

private Map<String, Map<String, FlowInfo>> constructFlowsFromBuilderMap(Map<String, Map<String, FlowInfo.Builder>> flows)
private Table<String, String, FlowInfo> constructFlowsFromTable(Table<String, String, FlowInfo.Builder> flows)
{
ImmutableMap.Builder<String, Map<String, FlowInfo>> flowsBuilder = ImmutableMap.builder();
for (Entry<String, Map<String, FlowInfo.Builder>> flowsEntry : flows.entrySet()) {
String eventType = flowsEntry.getKey();
Map<String, FlowInfo.Builder> flowsForEventType = flowsEntry.getValue();

ImmutableMap.Builder<String, FlowInfo> flowsBuilderForEventType = ImmutableMap.builder();
for (Entry<String, FlowInfo.Builder> flowsForEventTypeEntry : flowsForEventType.entrySet()) {
String flowId = flowsForEventTypeEntry.getKey();
FlowInfo.Builder flowBuilder = flowsForEventTypeEntry.getValue();
flowsBuilderForEventType.put(flowId, flowBuilder.build());
}
ImmutableTable.Builder<String, String, FlowInfo> flowsBuilder = ImmutableTable.builder();

flowsBuilder.put(eventType, flowsBuilderForEventType.build());
for (Cell<String, String, Builder> cell : flows.cellSet()) {
flowsBuilder.put(cell.getRowKey(), cell.getColumnKey(), cell.getValue().build());
}

return flowsBuilder.build();
Expand Down Expand Up @@ -310,6 +303,47 @@ private void stopBatchProcessor(String eventType, String flowId, BatchProcessor<
processor.stop();
}

private List<TapSpec> createTapSpecFromDiscovery(Iterable<ServiceDescriptor> descriptors)
{
ImmutableList.Builder<TapSpec> tapSpecBuilder = ImmutableList.builder();
for (ServiceDescriptor descriptor : descriptors) {

Map<String, String> properties = descriptor.getProperties();
String eventType = properties.get(EVENT_TYPE_PROPERTY_NAME);
String flowId = properties.get(FLOW_ID_PROPERTY_NAME);

URI uri = safeUriFromString(properties.get(HTTPS_PROPERTY_NAME));
if (uri == null && allowHttpConsumers) {
uri = safeUriFromString(properties.get(HTTP_PROPERTY_NAME));
}

if (isNullOrEmpty(eventType) || isNullOrEmpty(flowId) || uri == null) {
continue;
}

String qosDeliveryString = firstNonNull(properties.get(QOS_DELIVERY_PROPERTY_NAME), BEST_EFFORT.toString());
TapSpec tapSpec = new TapSpec(eventType, flowId, uri, QosDelivery.fromString(qosDeliveryString));

tapSpecBuilder.add(tapSpec);
}

return tapSpecBuilder.build();
}

private static List<TapSpec> createTapSpecFromConfig(StaticEventTapConfig staticEventTapConfig)
{
ImmutableList.Builder<TapSpec> tapSpecBuilder = ImmutableList.builder();
for (Entry<FlowKey, PerFlowStaticEventTapConfig> entry : staticEventTapConfig.getStaticTaps().entrySet()) {
FlowKey flowKey = entry.getKey();
PerFlowStaticEventTapConfig config = entry.getValue();
for (String uri : config.getUris()) {
tapSpecBuilder.add(new TapSpec(flowKey.getEventType(), flowKey.getFlowId(), safeUriFromString(uri), config.getQosDelivery()));
}
}

return tapSpecBuilder.build();
}

private static URI safeUriFromString(String uri)
{
try {
Expand All @@ -322,7 +356,7 @@ private static URI safeUriFromString(String uri)

private static String createBatchProcessorName(String eventType, String flowId)
{
return String.format("%s{%s}", eventType, flowId);
return format("%s{%s}", eventType, flowId);
}

static class FlowInfo
Expand Down Expand Up @@ -415,4 +449,40 @@ public EventTypePolicy build()
}
}
}

private static class TapSpec
{
private String eventType;
private String flowId;
private URI uri;
private QosDelivery qosDelivery;

public TapSpec(String eventType, String flowId, URI uri, QosDelivery qosDelivery)
{
this.eventType = checkNotNull(eventType, "eventType is null");
this.flowId = checkNotNull(flowId, "flowId is null");
this.uri = checkNotNull(uri, "uri is null");
this.qosDelivery = checkNotNull(qosDelivery, "qosDelivery is null");
}

public String getEventType()
{
return eventType;
}

public String getFlowId()
{
return flowId;
}

public URI getUri()
{
return uri;
}

public QosDelivery getQosDelivery()
{
return qosDelivery;
}
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/proofpoint/event/collector/MainModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void configure(Binder binder)
binder.bind(CombineObjectMetadataStore.class).to(S3CombineObjectMetadataStore.class).in(Scopes.SINGLETON);

bindConfig(binder).to(ServerConfig.class);
bindConfig(binder).to(StaticEventTapConfig.class);

eventBinder(binder).bindEventClient(CombineCompleted.class);

Expand Down Expand Up @@ -185,7 +186,8 @@ private ScheduledExecutorService createScheduledCombinerLowPriorityExecutor(Serv

@Provides
@Singleton
private Ticker providesTicker() {
private Ticker providesTicker()
{
return Ticker.systemTicker();
}
}
Loading