Skip to content

Commit

Permalink
Merge pull request #34 from nv-morpheus/branch-23.01
Browse files Browse the repository at this point in the history
Forward-merge branch-23.01 to branch-23.03
  • Loading branch information
GPUtester authored Jan 31, 2023
2 parents 142a274 + 94c9e51 commit cb91cf4
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 225 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# morpheus-experimental 23.01.00 (27 Jan 2023)

## 🚀 New Features

- Created Docker build process based on the process ([#28](https://github.com/nv-morpheus/morpheus-experimental/pull/28)) [@shawn-davis](https://github.com/shawn-davis)
- merge asset_clust ([#26](https://github.com/nv-morpheus/morpheus-experimental/pull/26)) [@avinashvem](https://github.com/avinashvem)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ This technique syntactically groups system log messages and finds group represen
## [Detection of Anomalous authentication using Relational Graph Neural Network (RGCN)](/anomalous-auth-detection)
This model shows an application of a graph neural network for anomalous authentication detection in Azure-AD signon heterogeneous graph. An Azure-AD signon dataset includes four types of nodes, authentication, user, device and service application nodes are used for modeling. A relational graph neural network (RGCN)is used to identify anomalous authentications from azure-ad signon input.

## [Asset Clustering using Windows Event Logs](/asset-clustering)
This model is a clustering algorithm to assign each host present in the dataset to a cluster based on aggregated and derived features from Windows Event Logs of that particular host.

# Repo Structure
Each prototype has its own directory that contains everything belonging to the specific prototype. Directories can include the following subfolders and documentation:

Expand Down
115 changes: 62 additions & 53 deletions asset-clustering/training-tuning-inference/data_preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,53 @@
import cudf
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import bz2
import logging
import time
import click
import numpy as np
from utils import *
from collections import defaultdict
from itertools import chain

import click
import numpy as np
from utils import compute_diff_source_logon_cnt
from utils import compute_eventid_cnt
from utils import compute_eventid_cnt_source
from utils import compute_logins_with_loghostuname
from utils import compute_username_cnt
from utils import compute_username_domain_cnt
from utils import get_fnames
from utils import logon_types
from utils import read_wls

import cudf

VALID_LOGON_TYPES = {0, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12}

# List of tuples (EventID, feature name), where a feature name denotes
# frequency of corresp. EventID, by asset appearing in LogHost field.
EVENTID_CNTFEAT = [
(4624, 'total_logins_cnt'),
(4625, 'accnt_fail_logon_cnt'),
(4634, 'total_logoff_cnt'),
(4647, 'total_user_initi_logoff_cnt'),
(4648, 'logon_explicit_cred_frm_cnt'),
(4672, 'spl_pvlgs'),
(4776, 'domain_ctr_validate_cnt'),
(4802, 'scrnsaver_invok_cnt'),
(4803, 'scrnsaver_dismiss_cnt')]
EVENTID_CNTFEAT = [(4624, 'total_logins_cnt'), (4625, 'accnt_fail_logon_cnt'), (4634, 'total_logoff_cnt'),
(4647, 'total_user_initi_logoff_cnt'), (4648, 'logon_explicit_cred_frm_cnt'), (4672, 'spl_pvlgs'),
(4776, 'domain_ctr_validate_cnt'), (4802, 'scrnsaver_invok_cnt'), (4803, 'scrnsaver_dismiss_cnt')]
# (4768, 'TGT_req_cnt'), (4769, 'TGS_req_cnt')
# 4768 & 4769 not used since 100% of LogHost for 4768,4769 is ActiveDirectory

# EVENTIDFORSOURCE_CNTFEAT & EVENTIDFORDEST_CNTFEAT are similar to EVENTID_CNTFEAT
# except that they corresp. to frequency of an EventID, by asset, appearing in
# Source & Destination fields resp.
EVENTIDFORSOURCE_CNTFEAT = [
(4624, 'total_logins_src_cnt'),
(4625, 'accnt_fail_logon_src_cnt'),
(4768, 'TGT_req_src_cnt'),
(4769, 'TGS_req_src_cnt'),
(4776, 'domain_ctr_validate_src_cnt')
]
EVENTIDFORSOURCE_CNTFEAT = [(4624, 'total_logins_src_cnt'), (4625, 'accnt_fail_logon_src_cnt'),
(4768, 'TGT_req_src_cnt'), (4769, 'TGS_req_src_cnt'), (4776, 'domain_ctr_validate_src_cnt')]
EVENTIDFORDEST_CNTFEAT = [(4648, 'logon_explicit_cred_to_cnt')]


Expand All @@ -61,7 +74,7 @@ def host_aggr(df, host, uniq_values_dict, count_cols):
newhosts = newhosts - set(host.index.to_pandas())
newhosts.discard(None)

frac_cols = ['uname_other_compacnt_login_frac','uname_that_compacnt_login_frac']
frac_cols = ['uname_other_compacnt_login_frac', 'uname_that_compacnt_login_frac']
newhost = cudf.DataFrame({'LogHost': newhosts}).set_index('LogHost')
newhost[count_cols] = 0
newhost[frac_cols] = 0.0
Expand All @@ -77,26 +90,28 @@ def host_aggr(df, host, uniq_values_dict, count_cols):
df = df.loc[(df['Source'].isna()) | (df['Destination'].isna())]
if numrows < df.shape[0]:
logging.debug("Filtering Rows if SOURCE & DESTINATION neq NA")
logging.debug("Removed {} ROWS".format(numrows-df.shape[0]))
logging.debug("Removed {} ROWS".format(numrows - df.shape[0]))

host = compute_logins_with_loghostuname(df, host, login_eventids=[4624,])
host = compute_logins_with_loghostuname(df, host, login_eventids=[
4624,
])
host = logon_types(df, host, VALID_LOGON_TYPES)
host, uniq_values_dict = compute_diff_source_logon_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_domain_cnt(df, host, uniq_values_dict)

for evtuple in EVENTID_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt(df , evid, ev_str, host)
host = compute_eventid_cnt(df, evid, ev_str, host)

for evtuple in EVENTIDFORSOURCE_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt_source(df , evid, ev_str, host)
host = compute_eventid_cnt_source(df, evid, ev_str, host)
host[count_cols] = host[count_cols].fillna(value=0, inplace=False)
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_cnt'] / host['total_logins_cnt']
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_frac'].replace(np.inf, -1.)

host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_cnt'] / host['total_logins_cnt']
host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_frac'].replace(np.inf, -1.)

return host, uniq_values_dict
Expand Down Expand Up @@ -125,11 +140,7 @@ def initialize_hostdf():
count_cols += ['uname_other_compacnt_login_cnt', 'uname_that_compacnt_login_cnt']
host = cudf.DataFrame(columns=['LogHost']).set_index('LogHost')

uniq_values_dict = {
'Sources': defaultdict(set),
'Unames': defaultdict(set),
'UserDomains': defaultdict(set)
}
uniq_values_dict = {'Sources': defaultdict(set), 'Unames': defaultdict(set), 'UserDomains': defaultdict(set)}
return host, uniq_values_dict, count_cols


Expand Down Expand Up @@ -164,41 +175,40 @@ def read_process_data(wls_files, readsize=1000000, max_lines=1e15):
df_wls = read_wls(current_block, file_path=False)
host_df, uniq_vals_dict = host_aggr(df_wls, host_df, uniq_vals_dict, count_cols)

total_lines += len(current_block)/1000000
total_lines += len(current_block) / 1000000
iter_ += 1

if iter_ % 10000 == 0:
proc_speed = 1000.0*total_lines / (time.time() - t0)
logging.info(
'{:.3f}M Lines, {:.2f}K/sec'.format(total_lines, proc_speed))
logging.debug('host shape:{}'.format(hostdf.shape))
if total_lines*1e6 > max_lines:
logging.info("Breaking for loop. total_lines={}>{}".format(total_lines, max_lines))
break
proc_speed = 1000.0 * total_lines / (time.time() - t0)
logging.info('{:.3f}M Lines, {:.2f}K/sec'.format(total_lines, proc_speed))
logging.debug('host shape:{}'.format(host_df.shape))
if total_lines * 1e6 > max_lines:
logging.info("Breaking for loop. total_lines={}>{}".format(total_lines, max_lines))
break
fi.close()
return hostdf
return host_df


@click.command()
@click.option('--debug', is_flag=True)
@click.option('--data_range', default='day-01-day-01',
help='Range of dates for which wls files need to be read and preprocessed. '\
'For example, data_range=day-01-day_03 reads wls_day-01.bz2, wls_day-02.bz2'\
'and wls_day-03.bz2, preprocess them and prepare a combined dataset.')
@click.option('--data_range',
default='day-01-day-01',
help=('Range of dates for which wls files need to be read and preprocessed. '
'For example, data_range=day-01-day_03 reads wls_day-01.bz2, wls_day-02.bz2 '
'and wls_day-03.bz2, preprocess them and prepare a combined dataset.'))
def run(**kwargs):
global dataset_path
debug_mode = kwargs['debug']
logging.basicConfig(level=logging.DEBUG, datefmt='%m%d-%H%M',
format='%(asctime)s: %(message)s')
logging.basicConfig(level=logging.DEBUG, datefmt='%m%d-%H%M', format='%(asctime)s: %(message)s')
dataset_path = '../datasets/'
ipfile_suffix = kwargs['data_range']
if debug_mode:
max_lines = 5e6
readsize = 32768*32
readsize = 32768 * 32
opfile_suffix = '_{:d}Mlines'.format(int(max_lines / 1e6))
else:
max_lines = 1e15
readsize = 32768*32*30
readsize = 32768 * 32 * 30
opfile_suffix = '_' + ipfile_suffix
logger_fname = 'logs/dataprocess_{}.log'.format(ipfile_suffix)
fh = logging.FileHandler(filename=logger_fname, mode='a')
Expand All @@ -207,8 +217,7 @@ def run(**kwargs):
logging.getLogger().addHandler(fh)
print("Logging in {}".format(logger_fname))

logging.info("DataProcess for WLS files {}. Read Size:{}MB\n\n".format(
ipfile_suffix, readsize//2**20))
logging.info("DataProcess for WLS files {}. Read Size:{}MB\n\n".format(ipfile_suffix, readsize // 2**20))
wls_files = get_fnames(dataset_path, ipfile_suffix)
host_df = read_process_data(wls_files, readsize, max_lines)
logging.debug("Number of hosts:{}".format(host_df.shape[0]))
Expand All @@ -217,4 +226,4 @@ def run(**kwargs):

if __name__ == '__main__':

run()
run()
69 changes: 47 additions & 22 deletions asset-clustering/training-tuning-inference/inference.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging
import pickle

import click
from utils import compute_chars
from utils import normalize_host_data


@click.command()
@click.option('--model', default='dbscan', help='Clustering method to use.'\
' Valid choices are \'kmeans\' or \'dbscan\'. Default is \'dbscan\'.'\
'The corresponding model pickle file will be read from the relative'\
'path \'../models/ \'.')
@click.option('--data_fname', default='host_agg_data_day-11_day-15.csv',\
help='Name of the Preprocessed csv dataset to perofrm inference. The given'\
'file name will be read from the relative path \'../datasets/ \'')
@click.option('--num_days', default=5.0, help='Number of days worth of data used'\
'in preparing the dataset. Used to normalize the features.')
@click.option('--compute_cluster_chars', is_flag=True, help='Boolean flag. If '\
'not provided, script just performs inference and output the cluster sizes.'\
'If provided, additionally analyzes for the top salient features of each cluster'\
'and prints the analysis to stdout.')
@click.option('--model',
default='dbscan',
help=('Clustering method to use. '
'Valid choices are \'kmeans\' or \'dbscan\'. Default is \'dbscan\'. '
'The corresponding model pickle file will be read from the relative '
'path \'../models/ \'.'))
@click.option('--data_fname',
default='host_agg_data_day-11_day-15.csv',
help=('Name of the Preprocessed csv dataset to perofrm inference. The given '
'file name will be read from the relative path \'../datasets/ \''))
@click.option('--num_days',
default=5.0,
help=('Number of days worth of data used '
'in preparing the dataset. Used to normalize the features.'))
@click.option('--compute_cluster_chars',
is_flag=True,
help=('Boolean flag. If '
'not provided, script just performs inference and output the cluster sizes. '
'If provided, additionally analyzes for the top salient features of each cluster '
'and prints the analysis to stdout.'))
def run(**kwargs):
dataset_path = '../datasets/'
model_path = '../models/'
Expand All @@ -30,38 +53,40 @@ def run(**kwargs):
assert model in ['kmeans', 'dbscan'], \
"Valid choices for model are kmeans or dbscan"

data_path = dataset_path + kwargs['data_fname']
data_path = dataset_path + kwargs['data_fname']
df, df_norm = normalize_host_data(data_path)

if model=='dbscan':
if model == 'dbscan':
fname = model_path + 'dbscan_eps0.0005.pkl'
clust = "cluster_dbscan_eps0.0005_minkp1"

dbsc_model, pca, pca_dims = pickle.load(open(fname, "rb"))
df_pca = pca.transform(df_norm).iloc[:,:pca_dims]
df_pca = pca.transform(df_norm).iloc[:, :pca_dims]
df[clust] = dbsc_model.fit_predict(df_pca)

elif model=='kmeans':
elif model == 'kmeans':
fname = model_path + 'kmeans_16clusts.pkl'
clust = "cluster_KM_16"

kmeans_model, pca, pca_dims = pickle.load(open(fname, "rb"))
df_pca = pca.transform(df_norm).iloc[:,:pca_dims]
df_pca = pca.transform(df_norm).iloc[:, :pca_dims]
df[clust] = kmeans_model.predict(df_pca)

print("Cluster Size:\n{}".format(df[clust].value_counts()))

if compute_cluster_chars:
cluster_chars = compute_chars(df, clust, cluster_id=0, num_days=num_days)
compute_chars(df, clust, cluster_id=0, num_days=num_days)

return


if __name__ == '__main__':
dt = datetime.date.today()
logger_fname = 'logs/inference.log'.format(dt.strftime('%d%m%y'))
logger_fname = 'logs/inference.log'
print("Logging in {}".format(logger_fname))
logging.basicConfig(level=logging.DEBUG, filename=logger_fname,
filemode='a', format='%(asctime)s: %(message)s',
logging.basicConfig(level=logging.DEBUG,
filename=logger_fname,
filemode='a',
format='%(asctime)s: %(message)s',
datefmt='%m%d-%H%M')
run()
Loading

0 comments on commit cb91cf4

Please sign in to comment.