Skip to content

Commit

Permalink
Re-integrates XML, JSON, and adds Excel outputs. Closes Add static da…
Browse files Browse the repository at this point in the history
…ta all of the attribute types, etc, to the constants.py file #115, JSON, XML outputs broken #88, Add XML output #73, Modify the MFTParser class to make use of the new constants.py data #114
  • Loading branch information
rowingdude committed Sep 4, 2024
1 parent 3580835 commit 2ee4d83
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 22 deletions.
29 changes: 22 additions & 7 deletions src/analyzeMFT/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,42 @@
from .mft_analyzer import MftAnalyzer
from .constants import VERSION

async def main() -> None:
parser = OptionParser(usage="usage: %prog -f <mft_file> -o <output.csv> [-d] [-H]",
async def main():
parser = OptionParser(usage="usage: %prog -f <mft_file> -o <output_file> [options]",
version=f"%prog {VERSION}")
parser.add_option("-f", "--file", dest="filename",
help="MFT file to analyze", metavar="FILE")
parser.add_option("-o", "--output", dest="csvfile",
help="Output CSV file", metavar="FILE")
parser.add_option("-o", "--output", dest="output_file",
help="Output file", metavar="FILE")

export_group = OptionGroup(parser, "Export Options")
export_group.add_option("--csv", action="store_const", const="csv", dest="export_format",
help="Export as CSV (default)")
export_group.add_option("--json", action="store_const", const="json", dest="export_format",
help="Export as JSON")
export_group.add_option("--xml", action="store_const", const="xml", dest="export_format",
help="Export as XML")
export_group.add_option("--excel", action="store_const", const="excel", dest="export_format",
help="Export as Excel")
parser.add_option_group(export_group)

parser.add_option("-d", "--debug", action="store_true", dest="debug",
help="Enable debug output", default=False)
parser.add_option("-H", "--hash", action="store_true", dest="compute_hashes",
help="Compute hashes (MD5, SHA256, SHA512, CRC32)", default=False)

(options, args) = parser.parse_args()

if not options.filename or not options.csvfile:
if not options.filename or not options.output_file:
parser.print_help()
sys.exit(1)

analyzer = MftAnalyzer(options.filename, options.csvfile, options.debug, options.compute_hashes)
if not options.export_format:
options.export_format = "csv"

analyzer = MftAnalyzer(options.filename, options.output_file, options.debug, options.compute_hashes, options.export_format)
await analyzer.analyze()
print(f"Analysis complete. Results written to {options.csvfile}")
print(f"Analysis complete. Results written to {options.output_file}")

if __name__ == "__main__":
asyncio.run(main())
50 changes: 50 additions & 0 deletions src/analyzeMFT/file_writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import csv
import json
import xml.etree.ElementTree as ET
import asyncio
from typing import List, Dict, Any
from .mft_record import MftRecord

class FileWriters:
@staticmethod
async def write_csv(records: List[MftRecord], output_file: str) -> None:
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(CSV_HEADER)
for record in records:
writer.writerow(record.to_csv())
await asyncio.sleep(0)

@staticmethod
async def write_json(records: List[MftRecord], output_file: str) -> None:
json_data = [record.__dict__ for record in records]
with open(output_file, 'w', encoding='utf-8') as jsonfile:
json.dump(json_data, jsonfile, indent=2, default=str)
await asyncio.sleep(0)

@staticmethod
async def write_xml(records: List[MftRecord], output_file: str) -> None:
root = ET.Element("mft_records")
for record in records:
record_elem = ET.SubElement(root, "record")
for key, value in record.__dict__.items():
ET.SubElement(record_elem, key).text = str(value)
tree = ET.ElementTree(root)
tree.write(output_file, encoding='utf-8', xml_declaration=True)
await asyncio.sleep(0)

@staticmethod
async def write_excel(records: List[MftRecord], output_file: str) -> None:
try:
import openpyxl
except ImportError:
print("openpyxl is not installed. Please install it to use Excel export.")
return

wb = openpyxl.Workbook()
ws = wb.active
ws.append(CSV_HEADER)
for record in records:
ws.append(record.to_csv())
wb.save(output_file)
await asyncio.sleep(0)
32 changes: 17 additions & 15 deletions src/analyzeMFT/mft_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

class MftAnalyzer:

def __init__(self, mft_file: str, output_file: str, debug: bool = False, compute_hashes: bool = False) -> None:
def __init__(self, mft_file: str, output_file: str, debug: bool = False, compute_hashes: bool = False, export_format: str = "csv") -> None:
self.mft_file = mft_file
self.output_file = output_file
self.debug = debug
self.compute_hashes = compute_hashes
self.mft_records = {}
self.export_format = export_format
self.mft_records = []
self.interrupt_flag = asyncio.Event()
self.csv_writer = None
self.csvfile = None
Expand All @@ -35,26 +36,15 @@ def __init__(self, mft_file: str, output_file: str, debug: bool = False, compute

async def analyze(self) -> None:
try:
self.csvfile = io.open(self.output_file, 'w', newline='', encoding='utf-8')
self.csv_writer = csv.writer(self.csvfile)
header = CSV_HEADER.copy()
if self.compute_hashes:
header.extend(['MD5', 'SHA256', 'SHA512', 'CRC32'])
self.csv_writer.writerow(header)

self.handle_interrupt()
await self.process_mft()

await self.write_output()
except Exception as e:
print(f"An unexpected error occurred: {e}")
if self.debug:
traceback.print_exc()
finally:
await self.write_remaining_records()
self.print_statistics()
if self.csvfile:
self.csvfile.close()
print(f"Analysis complete. Results written to {self.output_file}")


async def process_mft(self) -> None:
try:
Expand Down Expand Up @@ -192,3 +182,15 @@ def print_statistics(self) -> None:
print(f"Unique SHA512 hashes: {len(self.stats['unique_sha512'])}")
print(f"Unique CRC32 hashes: {len(self.stats['unique_crc32'])}")


async def write_output(self) -> None:
if self.export_format == "csv":
await FileWriters.write_csv(self.mft_records, self.output_file)
elif self.export_format == "json":
await FileWriters.write_json(self.mft_records, self.output_file)
elif self.export_format == "xml":
await FileWriters.write_xml(self.mft_records, self.output_file)
elif self.export_format == "excel":
await FileWriters.write_excel(self.mft_records, self.output_file)
else:
print(f"Unsupported export format: {self.export_format}")

0 comments on commit 2ee4d83

Please sign in to comment.