Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JS for Threat intel feeds - changed extension #675

Merged
merged 9 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ opensearchplugin {
name 'opensearch-security-analytics'
description 'OpenSearch Security Analytics plugin'
classname 'org.opensearch.securityanalytics.SecurityAnalyticsPlugin'
// extendedPlugins = ['opensearch-job-scheduler']
extendedPlugins = ['opensearch-job-scheduler']
}

javaRestTest {
Expand Down Expand Up @@ -143,12 +143,6 @@ repositories {
sourceSets.main.java.srcDirs = ['src/main/generated','src/main/java']
configurations {
zipArchive

all {
resolutionStrategy {
force "com.google.guava:guava:32.0.1-jre"
}
}
}

dependencies {
Expand All @@ -159,20 +153,14 @@ dependencies {
api "org.opensearch:common-utils:${common_utils_version}@jar"
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
implementation "org.apache.commons:commons-csv:1.10.0"

// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"

//spotless
implementation('com.google.googlejavaformat:google-java-format:1.17.0') {
exclude group: 'com.google.guava'
}
implementation 'com.google.guava:guava:32.0.1-jre'
}

// RPM & Debian build
Expand Down Expand Up @@ -303,6 +291,12 @@ testClusters.integTest {
}
}
}))
nodes.each { node ->
def plugins = node.plugins
def firstPlugin = plugins.get(0)
plugins.remove(0)
plugins.add(firstPlugin)
}
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
*/
package org.opensearch.securityanalytics;

import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.ActionListener;
Expand All @@ -32,6 +36,9 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.*;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
Expand All @@ -54,6 +61,7 @@
import org.opensearch.securityanalytics.threatIntel.action.*;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobUpdateService;
Expand All @@ -68,13 +76,12 @@
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;

public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin {
public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin, JobSchedulerExtension {

private static final Logger log = LogManager.getLogger(SecurityAnalyticsPlugin.class);

Expand All @@ -90,6 +97,8 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/correlation/rules";

public static final String CUSTOM_LOG_TYPE_URI = PLUGINS_BASE_URI + "/logtype";
public static final String JOB_INDEX_NAME = ".opensearch-sap-threatintel-job";
public static final Map<String, Object> TIF_JOB_INDEX_SETTING = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-all", "index.hidden", true);

private CorrelationRuleIndices correlationRuleIndices;

Expand Down Expand Up @@ -117,6 +126,8 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
return List.of(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data"));
}



@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
Expand Down Expand Up @@ -147,7 +158,7 @@ public Collection<Object> createComponents(Client client,
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService, builtInTIFMetadataLoader);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

TIFJobRunner.getJobRunnerInstance().initialize(clusterService,tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);
TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
Expand Down Expand Up @@ -192,10 +203,31 @@ public List<RestHandler> getRestHandlers(Settings settings,
new RestSearchCorrelationRuleAction(),
new RestIndexCustomLogTypeAction(),
new RestSearchCustomLogTypeAction(),
new RestDeleteCustomLogTypeAction()
new RestDeleteCustomLogTypeAction(),
new RestPutTIFJobHandler(clusterSettings)
);
}

@Override
public String getJobType() {
return "opensearch_sap_threatintel_job";
}

@Override
public String getJobIndex() {
return JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
return TIFJobRunner.getJobRunnerInstance();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> TIFJobParameter.PARSER.parse(parser, null);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public class SecurityAnalyticsSettings {
// threat intel settings
public static final Setting<TimeValue> TIF_UPDATE_INTERVAL = Setting.timeSetting(
"plugins.security_analytics.threatintel.tifjob.update_interval",
TimeValue.timeValueHours(24),
TimeValue.timeValueHours(1),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;
Expand Down Expand Up @@ -103,7 +105,7 @@ public void getThreatIntelFeedData(
) {
try {
//if index not exists
if(IndexUtils.getNewIndexByCreationDate(
if (IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threatintel*"
Expand All @@ -129,7 +131,7 @@ public void getThreatIntelFeedData(
listener.onFailure(e);
}
}

private void createThreatIntelFeedData() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL))).actionGet();
Expand All @@ -138,7 +140,7 @@ private void createThreatIntelFeedData() throws InterruptedException {

/**
* Create an index for a threat intel feed
*
* <p>
* Index setting start with single shard, zero replica, no refresh interval, and hidden.
* Once the threat intel feed is indexed, do refresh and force merge.
* Then, change the index setting to expand replica to all nodes, and read only allow delete.
Expand Down Expand Up @@ -174,7 +176,7 @@ private String getIndexMapping() {
* Puts threat intel feed from CSVRecord iterator into a given index in bulk
*
* @param indexName Index name to save the threat intel feed
* @param iterator TIF data to insert
* @param iterator TIF data to insert
* @param renewLock Runnable to renew lock
*/
public void parseAndSaveThreatIntelFeedDataCSV(
Expand All @@ -197,6 +199,10 @@ public void parseAndSaveThreatIntelFeedDataCSV(
String iocType = tifMetadata.getIocType(); //todo make generic in upcoming versions
Integer colNum = tifMetadata.getIocCol();
String iocValue = record.values()[colNum].split(" ")[0];
if (iocType.equals("ip") && !isValidIp(iocValue)) {
log.info("Invalid IP address, skipping this ioc record.");
continue;
}
String feedId = tifMetadata.getFeedId();
Instant timestamp = Instant.now();
ThreatIntelFeedData threatIntelFeedData = new ThreatIntelFeedData(iocType, iocValue, feedId, timestamp);
Expand All @@ -218,8 +224,14 @@ public void parseAndSaveThreatIntelFeedDataCSV(
freezeIndex(indexName);
}

public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {
public static boolean isValidIp(String ip) {
String ipPattern = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$";
Pattern pattern = Pattern.compile(ipPattern);
Matcher matcher = pattern.matcher(ip);
return matcher.matches();
}

public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {
try {
BulkResponse response = StashedThreadContext.run(client, () -> {
return client.bulk(bulkRequest).actionGet(timeout);
Expand Down Expand Up @@ -252,10 +264,6 @@ private void freezeIndex(final String indexName) {
});
}

public void deleteThreatIntelDataIndex(final String index) {
deleteThreatIntelDataIndex(Arrays.asList(index));
}

public void deleteThreatIntelDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
Expand Down Expand Up @@ -286,10 +294,4 @@ public void deleteThreatIntelDataIndex(final List<String> indices) {
throw new OpenSearchException("failed to delete data[{}]", String.join(",", indices));
}
}
public static class ThreatIntelFeedUpdateHandler implements Runnable {

@Override
public void run() {

}
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public TimeValue getUpdateInterval() {
return this.updateInterval;
}

public void setUpdateInterval(TimeValue timeValue) {
this.updateInterval = timeValue;
}

/**
* Default constructor
* @param name name of a tif job
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatIntel.action;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.PUT;

/**
* Rest handler for threat intel TIFjob creation
*
* This handler handles a request of
* PUT /_plugins/security_analytics/threatintel/tifjob/{id}
* {
* "id": {id},
* "name": {name},
* "update_interval_in_days": 1
* }
*
* When request is received, it will create a TIFjob
* After the creation of TIFjob is completed, it will schedule the next update task after update_interval_in_days.
*
*/
public class RestPutTIFJobHandler extends BaseRestHandler {
private static final String ACTION_NAME = "threatintel_tifjob_put";
private final ClusterSettings clusterSettings;

public RestPutTIFJobHandler(final ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final PutTIFJobRequest putTIFJobRequest = new PutTIFJobRequest("jobname",
new TimeValue(1, TimeUnit.MINUTES));

return channel -> client.executeLocally(PutTIFJobAction.INSTANCE, putTIFJobRequest, new RestToXContentListener<>(channel));
}

@Override
public List<Route> routes() {
String path = "/_p/_s";
return List.of(new Route(GET, path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ protected void doExecute(final Task task, final DeleteTIFJobRequest request, fin
return;
}
try {
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
deleteTIFJob(request.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

package org.opensearch.securityanalytics.threatIntel.common;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobExtension.JOB_INDEX_NAME;
import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;


import java.time.Instant;
import java.util.Optional;
Expand Down
Loading