diff --git a/README.md b/README.md index 1a102e8..be2896f 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ This project contains: * A Python library for retrieving and working with VPC Flow logs The tools support reading Flow Logs from both [CloudWatch Logs](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-cwl.html) and [S3](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-s3.html). -For S3 destinations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported. +It can also read from local files pulled from S3. +For S3 and file locations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported. The library builds on [boto3](https://github.com/boto/boto3) and should work on the [supported versions](https://devguide.python.org/#status-of-python-branches) of Python 3. @@ -45,9 +46,18 @@ __Location types__ `flowlogs_reader` has one required argument, `location`. By default that is interpreted as a CloudWatch Logs group. -To use an S3 location, specify `--location-type='s3'`: +To use an S3 location, specify `--location-type="s3"`: -* `flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"` +``` +flowlogs_reader --location-type="s3" "bucket-name/optional-prefix" +``` + +To use a file location, specify `--location-type="file"`. If the location is a directory, all the `.log.gz` files in it will be read. + +``` +flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/sample-file.log.gz" +flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/" +``` __Printing flows__ diff --git a/flowlogs_reader/__init__.py b/flowlogs_reader/__init__.py index 0b6bb75..fe7b3c7 100644 --- a/flowlogs_reader/__init__.py +++ b/flowlogs_reader/__init__.py @@ -16,6 +16,7 @@ from .flowlogs_reader import ( FlowRecord, FlowLogsReader, + LocalFileReader, S3FlowLogsReader, ) @@ -23,5 +24,6 @@ 'aggregated_records', 'FlowRecord', 'FlowLogsReader', + 'LocalFileReader', 'S3FlowLogsReader', ] diff --git a/flowlogs_reader/__main__.py b/flowlogs_reader/__main__.py index 5392a2f..86fcb02 100644 --- a/flowlogs_reader/__main__.py +++ b/flowlogs_reader/__main__.py @@ -23,6 +23,7 @@ from .aggregation import aggregated_records from .flowlogs_reader import ( FlowLogsReader, + LocalFileReader, NODATA, S3FlowLogsReader, SKIPDATA, @@ -104,6 +105,8 @@ def get_reader(args): elif args.location_type == 's3': cls = S3FlowLogsReader client_type = 's3' + elif args.location_type == 'file': + cls = LocalFileReader if args.region: kwargs['region_name'] = args.region @@ -179,7 +182,7 @@ def main(argv=None): '--location-type', type=str, help='location type (CloudWatch Logs or S3), default is cwl', - choices=['cwl', 's3'], + choices=['cwl', 's3', 'file'], default='cwl', ) parser.add_argument( diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index accad21..ea54f13 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -18,7 +18,7 @@ from datetime import datetime, timedelta from gzip import open as gz_open from os.path import basename -from parquet import DictReader as parquet_dict_reader +from pathlib import Path from threading import Lock import boto3 @@ -26,6 +26,7 @@ from botocore.exceptions import PaginationError from dateutil.rrule import rrule, DAILY +from parquet import DictReader as parquet_dict_reader DEFAULT_FIELDS = ( 'version', @@ -244,20 +245,12 @@ def from_cwl_event(cls, cwl_event, fields=DEFAULT_FIELDS): class BaseReader: def __init__( self, - client_type, region_name=None, start_time=None, end_time=None, boto_client=None, raise_on_error=False, ): - self.region_name = region_name - if boto_client is not None: - self.boto_client = boto_client - else: - kwargs = {'region_name': region_name} if region_name else {} - self.boto_client = boto3.client(client_type, **kwargs) - # If no time filters are given use the last hour now = datetime.utcnow() self.start_time = start_time or now - timedelta(hours=1) @@ -276,7 +269,21 @@ def __next__(self): return next(self.iterator) -class FlowLogsReader(BaseReader): +class AWSReader(BaseReader): + def __init__( + self, client_type, region_name=None, boto_client=None, **kwargs + ): + self.region_name = region_name + if boto_client is not None: + self.boto_client = boto_client + else: + client_kwargs = {'region_name': region_name} if region_name else {} + self.boto_client = boto3.client(client_type, **client_kwargs) + + super().__init__(**kwargs) + + +class FlowLogsReader(AWSReader): """ Returns an object that will yield VPC Flow Log records as Python objects. * `log_group_name` is the name of the CloudWatch Logs group that stores @@ -408,7 +415,7 @@ def _reader(self): raise -class S3FlowLogsReader(BaseReader): +class S3FlowLogsReader(AWSReader): def __init__( self, location, @@ -547,3 +554,31 @@ def _reader(self): self.skipped_records += 1 if self.raise_on_error: raise + + +class LocalFileReader(BaseReader): + def __init__(self, location, **kwargs): + self.location = location + super().__init__(**kwargs) + + def _read_file(self, file_path): + with gz_open(file_path, mode='rt') as gz_f: + reader = csv_dict_reader(gz_f, delimiter=' ') + reader.fieldnames = [ + f.replace('-', '_') for f in reader.fieldnames + ] + yield from reader + with THREAD_LOCK: + self.bytes_processed += gz_f.tell() + + def _reader(self): + path = Path(self.location) + all_files = path.glob('*.log.gz') if path.is_dir() else [path] + for file_path in all_files: + for event_data in self._read_file(file_path): + try: + yield FlowRecord(event_data) + except Exception: + self.skipped_records += 1 + if self.raise_on_error: + raise diff --git a/tests/test_flowlogs_reader.py b/tests/test_flowlogs_reader.py index a018051..72bb976 100644 --- a/tests/test_flowlogs_reader.py +++ b/tests/test_flowlogs_reader.py @@ -13,8 +13,10 @@ # limitations under the License. from datetime import datetime -from gzip import compress +from gzip import compress, open as gz_open from io import BytesIO +from os.path import join +from tempfile import TemporaryDirectory from unittest import TestCase from unittest.mock import MagicMock, patch @@ -27,6 +29,7 @@ aggregated_records, FlowRecord, FlowLogsReader, + LocalFileReader, S3FlowLogsReader, ) from flowlogs_reader.flowlogs_reader import ( @@ -99,6 +102,65 @@ 'subnet-0123456789abcdef 7 7 IPv4 5 vpc-04456ab739938ee3f\n' 'bogus line\n' ) + +V5_DICTS = [ + { + 'account_id': '999999999999', + 'action': 'ACCEPT', + 'az_id': 'use2-az2', + 'bytes': 4895, + 'dstaddr': '192.0.2.156', + 'dstport': 50318, + 'end': datetime(2021, 3, 4, 14, 1, 51), + 'flow_direction': 'ingress', + 'instance_id': 'i-00123456789abcdef', + 'interface_id': 'eni-00123456789abcdef', + 'log_status': 'OK', + 'packets': 15, + 'pkt_dstaddr': '192.0.2.156', + 'pkt_src_aws_service': 'S3', + 'pkt_srcaddr': '198.51.100.6', + 'protocol': 6, + 'region': 'us-east-2', + 'srcaddr': '198.51.100.7', + 'srcport': 443, + 'start': datetime(2021, 3, 4, 14, 1, 33), + 'subnet_id': 'subnet-0123456789abcdef', + 'tcp_flags': 19, + 'type': 'IPv4', + 'version': 5, + 'vpc_id': 'vpc-04456ab739938ee3f', + }, + { + 'account_id': '999999999999', + 'action': 'ACCEPT', + 'az_id': 'use2-az2', + 'bytes': 3015, + 'dstaddr': '198.51.100.6', + 'dstport': 443, + 'end': datetime(2021, 3, 4, 14, 1, 51), + 'flow_direction': 'egress', + 'instance_id': 'i-00123456789abcdef', + 'interface_id': 'eni-00123456789abcdef', + 'log_status': 'OK', + 'packets': 16, + 'pkt_dst_aws_service': 'S3', + 'pkt_dstaddr': '198.51.100.7', + 'pkt_srcaddr': '192.0.2.156', + 'protocol': 6, + 'region': 'us-east-2', + 'srcaddr': '192.0.2.156', + 'srcport': 50318, + 'start': datetime(2021, 3, 4, 14, 1, 33), + 'subnet_id': 'subnet-0123456789abcdef', + 'tcp_flags': 7, + 'traffic_path': 7, + 'type': 'IPv4', + 'version': 5, + 'vpc_id': 'vpc-04456ab739938ee3f', + }, +] + V6_FILE = ( 'resource-type tgw-id tgw-attachment-id tgw-src-vpc-account-id ' 'tgw-dst-vpc-account-id tgw-src-vpc-id tgw-dst-vpc-id tgw-src-subnet-id ' @@ -709,64 +771,7 @@ def test_serial_v4(self): ) def test_serial_v5(self): - expected = [ - { - 'account_id': '999999999999', - 'action': 'ACCEPT', - 'az_id': 'use2-az2', - 'bytes': 4895, - 'dstaddr': '192.0.2.156', - 'dstport': 50318, - 'end': datetime(2021, 3, 4, 14, 1, 51), - 'flow_direction': 'ingress', - 'instance_id': 'i-00123456789abcdef', - 'interface_id': 'eni-00123456789abcdef', - 'log_status': 'OK', - 'packets': 15, - 'pkt_dstaddr': '192.0.2.156', - 'pkt_src_aws_service': 'S3', - 'pkt_srcaddr': '198.51.100.6', - 'protocol': 6, - 'region': 'us-east-2', - 'srcaddr': '198.51.100.7', - 'srcport': 443, - 'start': datetime(2021, 3, 4, 14, 1, 33), - 'subnet_id': 'subnet-0123456789abcdef', - 'tcp_flags': 19, - 'type': 'IPv4', - 'version': 5, - 'vpc_id': 'vpc-04456ab739938ee3f', - }, - { - 'account_id': '999999999999', - 'action': 'ACCEPT', - 'az_id': 'use2-az2', - 'bytes': 3015, - 'dstaddr': '198.51.100.6', - 'dstport': 443, - 'end': datetime(2021, 3, 4, 14, 1, 51), - 'flow_direction': 'egress', - 'instance_id': 'i-00123456789abcdef', - 'interface_id': 'eni-00123456789abcdef', - 'log_status': 'OK', - 'packets': 16, - 'pkt_dst_aws_service': 'S3', - 'pkt_dstaddr': '198.51.100.7', - 'pkt_srcaddr': '192.0.2.156', - 'protocol': 6, - 'region': 'us-east-2', - 'srcaddr': '192.0.2.156', - 'srcport': 50318, - 'start': datetime(2021, 3, 4, 14, 1, 33), - 'subnet_id': 'subnet-0123456789abcdef', - 'tcp_flags': 7, - 'traffic_path': 7, - 'type': 'IPv4', - 'version': 5, - 'vpc_id': 'vpc-04456ab739938ee3f', - }, - ] - reader = self._test_iteration(V5_FILE, expected) + reader = self._test_iteration(V5_FILE, V5_DICTS) self.assertEqual(reader.bytes_processed, len(V5_FILE.encode())) self.assertEqual( reader.compressed_bytes_processed, len(compress(V5_FILE.encode())) @@ -993,6 +998,34 @@ def test_threads(self): self._test_iteration(V3_FILE, expected) +class LocalFileReaderTests(TestCase): + def setUp(self): + self.temp_dir = TemporaryDirectory() + self.first_file = join(self.temp_dir.name, 'first.log.gz') + self.second_file = join(self.temp_dir.name, 'second.log.gz') + # Ignored: wrong extension + self.third_file = join(self.temp_dir.name, 'third.gz') + + for file_path in (self.first_file, self.second_file, self.third_file): + with gz_open(file_path, 'wt') as f: + f.write(V5_FILE) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_file(self): + reader = LocalFileReader(self.first_file) + actual = [x.to_dict() for x in reader] + self.assertEqual(actual, V5_DICTS) + self.assertEqual(reader.bytes_processed, len(V5_FILE.encode())) + + def test_dir(self): + reader = LocalFileReader(self.temp_dir.name) + actual = [x.to_dict() for x in reader] + self.assertEqual(actual, V5_DICTS * 2) + self.assertEqual(reader.bytes_processed, len(V5_FILE.encode()) * 2) + + class AggregationTestCase(TestCase): def test_aggregated_records(self): # Aggregate by 5-tuple by default diff --git a/tests/test_main.py b/tests/test_main.py index 4527cc9..5ab6986 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -342,3 +342,15 @@ def test_s3_destination(self, mock_out, mock_reader): __, args, kwargs = call line = args[0] self.assertEqual(line, record) + + @patch('flowlogs_reader.__main__.LocalFileReader', autospec=True) + @patch('flowlogs_reader.__main__.print', create=True) + def test_file_destination(self, mock_out, mock_reader): + mock_out.stdout = io.BytesIO() + mock_reader.return_value = SAMPLE_RECORDS + main(['--location-type', 'file', '/tmp/test-file.csv.gz']) + mock_reader.assert_called_once_with(location='/tmp/test-file.csv.gz') + for call, record in zip_longest(mock_out.mock_calls, SAMPLE_INPUT): + __, args, kwargs = call + line = args[0] + self.assertEqual(line, record)