Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Available Services API call #80

Closed
wants to merge 16 commits into from
Closed
14 changes: 9 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,17 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${auto-value.version}</version>
<groupId>org.graylog.autovalue</groupId>
<artifactId>auto-value-javabean</artifactId>
<version>${auto-value-javabean.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -113,6 +111,12 @@
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/org/graylog/integrations/IntegrationsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
*/
package org.graylog.integrations;

import org.graylog.integrations.aws.resources.AWSResource;
import org.graylog.integrations.inputs.paloalto.PaloAltoCodec;
import org.graylog.integrations.inputs.paloalto.PaloAltoTCPInput;
import org.graylog2.plugin.PluginConfigBean;
import org.graylog2.plugin.PluginModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;

import java.util.Collections;
import java.util.Set;
Expand All @@ -30,7 +36,8 @@
* Extend the PluginModule abstract class here to add you plugin to the system.
*/
public class IntegrationsModule extends PluginModule {
private static final Logger LOG = LoggerFactory.getLogger(IntegrationsModule.class);

private static final Logger LOG = LoggerFactory.getLogger(IntegrationsModule.class);
/**
* Returns all configuration beans required by this plugin.
*
Expand Down Expand Up @@ -65,5 +72,10 @@ protected void configure() {
LOG.debug("Registering message input: {}", PaloAltoTCPInput.NAME);
addMessageInput(PaloAltoTCPInput.class);
addCodec(PaloAltoCodec.NAME, PaloAltoCodec.class);

addRestResource(AWSResource.class);

bind(CloudWatchLogsClientBuilder.class).toProvider(CloudWatchLogsClient::builder);
bind(KinesisClientBuilder.class).toProvider(KinesisClient::builder);
}
}
}
8 changes: 0 additions & 8 deletions src/main/java/org/graylog/integrations/aws/AWSService.java

This file was deleted.

42 changes: 42 additions & 0 deletions src/main/java/org/graylog/integrations/aws/CloudWatchService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.graylog.integrations.aws;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.paginators.DescribeLogGroupsIterable;

import javax.inject.Inject;
import java.util.ArrayList;

public class CloudWatchService {

private CloudWatchLogsClientBuilder logsClientBuilder;

@Inject
public CloudWatchService(CloudWatchLogsClientBuilder logsClientBuilder) {
this.logsClientBuilder = logsClientBuilder;
}

/**
* Returns a list of log groups that exist in CloudWatch.
*
* @param region The AWS region
* @return A list of log groups in alphabetical order.
*/
public ArrayList<String> getLogGroupNames(String region) {

final CloudWatchLogsClient cloudWatchLogsClient = logsClientBuilder.region(Region.of(region)).build();
final DescribeLogGroupsRequest describeLogGroupsRequest = DescribeLogGroupsRequest.builder().build();
final DescribeLogGroupsIterable responses = cloudWatchLogsClient.describeLogGroupsPaginator(describeLogGroupsRequest);

final ArrayList<String> groupNameList = new ArrayList<>();
for (DescribeLogGroupsResponse response : responses) {
for (int c = 0; c < response.logGroups().size(); c++) {
groupNameList.add(response.logGroups().get(c).logGroupName());
}
}
return groupNameList;
}
}
108 changes: 108 additions & 0 deletions src/main/java/org/graylog/integrations/aws/KinesisDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.graylog.integrations.aws;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet;
import org.graylog.autovalue.WithBeanGetter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mongojack.Id;
import org.mongojack.ObjectId;

import javax.annotation.Nullable;
import javax.validation.constraints.NotBlank;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* TODO: This class has not been used yet. We will delete it if we decide to store data directly in the input.
* I am currently feeling like we will likely store data in the input.
*/
@AutoValue
@JsonDeserialize(builder = KinesisDTO.Builder.class)
@WithBeanGetter
public abstract class KinesisDTO {


public static final String FIELD_ID = "id";

// TODO: Do we really need a title, summary, and description
public static final String FIELD_TITLE = "title";
public static final String FIELD_SUMMARY = "summary";
public static final String FIELD_DESCRIPTION = "description";

public static final String FIELD_AWS_ACCESS_KEY_ID = "aws_access_key_id";
public static final String FIELD_AWS_SECRET_ACCESS_KEY = "aws_secret_access_key";
public static final String FIELD_AWS_KINESIS_STREAM_NAME = "aws_kinesis_stream_name";
public static final String FIELD_AWS_LOG_TYPE = "aws_log_type";
public static final String FIELD_AWS_KINESIS_SUBSCRIPTION_ID = "kinesis_subscription_id";
public static final String FIELD_CREATED_AT = "created_at";

public static final ImmutableSet<String> SORT_FIELDS = ImmutableSet.of(FIELD_ID, FIELD_TITLE, FIELD_CREATED_AT );

@ObjectId
@Id
@Nullable
@JsonProperty(FIELD_ID)
public abstract String id();

@JsonProperty(FIELD_TITLE)
@NotBlank
public abstract String title();

// A short, one sentence description of the integration
@JsonProperty(FIELD_SUMMARY)
public abstract String summary();

// A longer description of the integration
@JsonProperty(FIELD_DESCRIPTION)
public abstract String description();

// Must be encrypted before being stored.
@JsonProperty(FIELD_AWS_ACCESS_KEY_ID)
public abstract String awsAccessKeyId();

// Must be encrypted before being stored.
@JsonProperty(FIELD_AWS_SECRET_ACCESS_KEY)
public abstract String awsSecretAccessKey();

@JsonProperty(FIELD_AWS_KINESIS_STREAM_NAME)
public abstract String awsKinesisStreamName();

@JsonProperty(FIELD_AWS_LOG_TYPE)
public abstract String awsLogType();

@JsonProperty(FIELD_AWS_KINESIS_SUBSCRIPTION_ID)
public abstract String awsKinesisSubscriptionId();

@JsonProperty(FIELD_CREATED_AT)
public abstract DateTime createdAt();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder id(String id);

public abstract Builder title(@NotBlank String title);

public abstract Builder summary(String summary);

public abstract Builder description(String description);

public abstract Builder awsAccessKeyId(String awsAccessKeyId);

public abstract Builder awsSecretAccessKey(String awsSecretAccessKey);

public abstract Builder awsKinesisStreamName(String awsKinesisStreamName);

public abstract Builder awsLogType(String awsLogType);

public abstract Builder awsKinesisSubscriptionId(String awsKinesisSubscriptionId);

public abstract Builder createdAt(DateTime createdAt);

public abstract KinesisDTO build();
}
}
122 changes: 122 additions & 0 deletions src/main/java/org/graylog/integrations/aws/KinesisService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.graylog.integrations.aws;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import org.apache.commons.lang3.StringUtils;
import org.graylog.integrations.aws.resources.requests.KinesisHealthCheckRequest;
import org.graylog.integrations.aws.resources.responses.KinesisHealthCheckResponse;
import org.graylog.integrations.aws.service.AWSLogMessage;
import org.graylog.integrations.aws.service.AWSService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

public class KinesisService {

private static final Logger LOG = LoggerFactory.getLogger(AWSService.class);
private static final int KINESIS_LIST_STREAMS_MAX_ATTEMPTS = 1000;
private static final int KINESIS_LIST_STREAMS_LIMIT = 30;

private final KinesisClientBuilder kinesisClientBuilder;

@Inject
public KinesisService(KinesisClientBuilder kinesisClientBuilder) {

this.kinesisClientBuilder = kinesisClientBuilder;
}

public KinesisHealthCheckResponse healthCheck(KinesisHealthCheckRequest heathCheckRequest) {

// TODO: Read a log message from Kinesis.

// Detect the log message format

// TODO: Replace with actual log message received from Kinesis stream.
String message = "2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK";
AWSLogMessage awsLogMessage = new AWSLogMessage(message);

return KinesisHealthCheckResponse.create(true,
awsLogMessage.messageType().toString(),
"Success! The message is an AWS FlowLog!");
}

/**
* Get a list of Kinesis stream names. All available streams will be returned.
*
* @param regionName The AWS region to query Kinesis stream names from.
* @return A list of all available Kinesis streams in the supplied region.
*/
public List<String> getKinesisStreams(String regionName, String accessKeyId, String secretAccessKey) throws ExecutionException {

LOG.debug("List Kinesis streams for region [{}]", regionName);

// Only explicitly provide credentials if key/secret are provided.
// TODO: Remove this IF check and always provided the string credentials. This will prevent the AWS SDK
// from reading credentials from environment variables, which we definitely do not want to do.
if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretAccessKey)) {
StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
kinesisClientBuilder.credentialsProvider(credentialsProvider);
}

final KinesisClient kinesisClient =
kinesisClientBuilder.region(Region.of(regionName)).build();

// KinesisClient.listStreams() is paginated. Use a retryer to loop and stream names (while ListStreamsResponse.hasMoreStreams() is true).
// The stopAfterAttempt retryer option is an emergency brake to prevent infinite loops
// if AWS API always returns true for hasMoreStreamNames.
final Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(b -> Objects.equals(b, Boolean.TRUE))
.withStopStrategy(StopStrategies.stopAfterAttempt(KINESIS_LIST_STREAMS_MAX_ATTEMPTS))
.build();

ListStreamsRequest streamsRequest = ListStreamsRequest.builder().limit(KINESIS_LIST_STREAMS_LIMIT).build();
final ListStreamsResponse listStreamsResponse = kinesisClient.listStreams(streamsRequest);
final List<String> streamNames = new ArrayList<>(listStreamsResponse.streamNames());

if (listStreamsResponse.hasMoreStreams()) {
try {
retryer.call(() -> {
final String lastStreamName = streamNames.get(streamNames.size() - 1);
final ListStreamsRequest moreStreamsRequest = ListStreamsRequest.builder()
.exclusiveStartStreamName(lastStreamName)
.limit(KINESIS_LIST_STREAMS_LIMIT).build();
final ListStreamsResponse moreSteamsResponse = kinesisClient.listStreams(moreStreamsRequest);
streamNames.addAll(moreSteamsResponse.streamNames());

// If more streams, then this will execute again.
return moreSteamsResponse.hasMoreStreams();
});
// Only catch the RetryException, which occurs after too many attempts. When this happens, we still want
// to the return the response with any streams obtained.
// All other exceptions will be bubbled up to the client caller.
} catch (RetryException e) {
LOG.error("Failed to get all stream names after {} attempts. Proceeding to return currently obtained streams.", KINESIS_LIST_STREAMS_MAX_ATTEMPTS);
}
}

LOG.debug("Kinesis streams queried: [{}]", streamNames);

return streamNames;
}

// TODO Create Kinesis Stream

// TODO Subscribe to Kinesis Stream

// TODO getRecord
}
Loading