-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlog_to_dask_df.py
116 lines (96 loc) · 5.38 KB
/
log_to_dask_df.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import dask.dataframe as dd
from zat import zeek_log_reader
class LogToDaskDataFrame(object):
"""LogToDaskDataFrame: Converts a Zeek log to a Dask DataFrame
Notes:
This class has recently been overhauled from a simple loader to a more
complex class that should in theory:
- Select better types for each column
- Should be faster
- Produce smaller memory footprint dataframes
If you have any issues/problems with this class please submit a GitHub issue.
More Info: https://supercowpowers.github.io/zat/large_dataframes.html
"""
def __init__(self):
"""Initialize the LogToDaskDataFrame class"""
# First Level Type Mapping
# This map defines the types used when first reading in the Zeek log into a 'chunk' dataframes.
# Types (like time and interval) will be defined as one type at first but then
# will undergo further processing to produce correct types with correct values.
# See: https://stackoverflow.com/questions/29245848/what-are-all-the-dtypes-that-pandas-recognizes
# for more info on supported types.
self.type_map = {'bool': 'category', # Can't hold NaN values in 'bool', so we're going to use category
'count': 'UInt64',
'int': 'Int32',
'double': 'float',
'time': 'float', # Secondary Processing into datetime
'interval': 'float', # Secondary processing into timedelta
'port': 'UInt16'
}
def _get_field_info(self, log_filename):
"""Internal Method: Use ZAT log reader to read header for names and types"""
_zeek_reader = zeek_log_reader.ZeekLogReader(log_filename)
_, field_names, field_types, _ = _zeek_reader._parse_zeek_header(log_filename)
return field_names, field_types
def _create_initial_df(self, log_filename, all_fields, usecols, dtypes):
"""Internal Method: Create the initial dataframes by using Dask read CSV (primary types correct)"""
return dd.read_csv(log_filename, sep='\t', names=all_fields, usecols=usecols, dtype=dtypes, comment="#", na_values='-', blocksize="64MB")
def create_dataframe(self, log_filename, ts_index=True, aggressive_category=True, usecols=None):
""" Create a Pandas dataframe from a Bro/Zeek log file
Args:
log_fllename (string): The full path to the Zeek log
ts_index (bool): Set the index to the 'ts' field (default = True)
aggressive_category (bool): convert unknown columns to category (default = True)
usecol (list): A subset of columns to read in (minimizes memory usage) (default = None)
"""
# Grab the field information
field_names, field_types = self._get_field_info(log_filename)
all_fields = field_names # We need ALL the fields for later
# If usecols is set then we'll subset the fields and types
if usecols:
# Usecols needs to include ts
if 'ts' not in usecols:
usecols.append('ts')
field_types = [t for t, field in zip(field_types, field_names) if field in usecols]
field_names = [field for field in field_names if field in usecols]
# Get the appropriate types for the Dask Dataframe
pandas_types = self.pd_column_types(field_names, field_types, aggressive_category)
# Now actually read in the initial dataframe
df = self._create_initial_df(log_filename, all_fields, usecols, pandas_types)
# Now we convert 'time' and 'interval' fields to datetime and timedelta respectively
for name, zeek_type in zip(field_names, field_types):
if zeek_type == 'time':
df[name] = dd.to_datetime(df[name], unit='s')
if zeek_type == 'interval':
df[name] = dd.to_timedelta(df[name], unit='s')
# Set the index
if ts_index:
try:
df = df.set_index('ts')
except KeyError:
print('Could not find ts/timestamp for index...')
return df
def pd_column_types(self, column_names, column_types, aggressive_category=True, verbose=False):
"""Given a set of names and types, construct a dictionary to be used
as the Dask read_csv dtypes argument"""
# Agressive Category means that types not in the current type_map are
# mapped to a 'category' if aggressive_category is False then they
# are mapped to an 'object' type
unknown_type = 'category' if aggressive_category else 'object'
pandas_types = {}
for name, zeek_type in zip(column_names, column_types):
# Grab the type
item_type = self.type_map.get(zeek_type)
# Sanity Check
if not item_type:
# UID/FUID/GUID always gets mapped to object
if 'uid' in name:
item_type = 'object'
else:
if verbose:
print('Could not find type for {:s} using {:s}...'.format(zeek_type, unknown_type))
item_type = unknown_type
# Set the pandas type
pandas_types[name] = item_type
# Return the dictionary of name: type
return pandas_types