From fca14f32eba1055eb1684f14fd114926733cee26 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Thu, 7 Nov 2024 18:23:21 +0100 Subject: [PATCH 01/12] added status handler, needs testing --- src/cli/iohandlers/status_handler.go | 96 ++++++++++++++++++++++++++++ src/cli/worker_manager.go | 20 ++++-- 2 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 src/cli/iohandlers/status_handler.go diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go new file mode 100644 index 00000000..8f17dfac --- /dev/null +++ b/src/cli/iohandlers/status_handler.go @@ -0,0 +1,96 @@ +/* + * ZDNS Copyright 2024 Regents of the University of Michigan + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package iohandlers + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/zmap/zdns/src/zdns" +) + +// Notes +// time elapsed, domains scanned total, avg. per second, per status entry, ordered largest to smallest + +type scanStats struct { + scanStartTime time.Time + domainsScanned int + statusCardinality map[zdns.Status]int +} + +// statusHandler prints a per-second update to the user scan progress and per-status statistics +func StatusHandler(statusChan <-chan zdns.Status, wg *sync.WaitGroup) { + stats := scanStats{} + stats.scanStartTime = time.Now() + timer := time.Tick(time.Second) +statusLoop: + for { + select { + case <-timer: + // print per-second summary + scanDuration := time.Since(stats.scanStartTime) + fmt.Printf("%s; %d domains scanned; %f domains/sec.; %s\n", + scanDuration, + stats.domainsScanned, + float64(stats.domainsScanned)/scanDuration.Seconds(), + getStatusOccuranceString(stats.statusCardinality)) + case status, ok := <-statusChan: + if !ok { + // TODO check that this syntax is valid + // status chan closed, exiting + break statusLoop + } + stats.domainsScanned += 1 + incrementStatus(stats, status) + } + } + fmt.Printf("%s; Scan Complete, no more input. %d domains scanned; %f domains/sec.; %s\n", + time.Since(stats.scanStartTime), + stats.domainsScanned, + float64(stats.domainsScanned)/time.Since(stats.scanStartTime).Seconds(), + getStatusOccuranceString(stats.statusCardinality)) + wg.Done() +} + +func incrementStatus(stats scanStats, status zdns.Status) { + if _, ok := stats.statusCardinality[status]; !ok { + stats.statusCardinality[status] = 0 + } + stats.statusCardinality[status] += 1 +} + +func getStatusOccuranceString(statusOccurances map[zdns.Status]int) string { + type statusAndCardinality struct { + status zdns.Status + occurance int + } + statusesAndCards := make([]statusAndCardinality, 0, len(statusOccurances)) + for status, occurance := range statusOccurances { + statusesAndCards = append(statusesAndCards, statusAndCardinality{ + status: status, + occurance: occurance, + }) + } + // TODO Sort these by occurance, largest to smallest + returnStr := "" + for _, statusAndOccurance := range statusesAndCards { + returnStr += fmt.Sprintf("%s: %d, ", statusAndOccurance.status, statusAndOccurance.occurance) + } + // remove trailing comma + returnStr = strings.TrimSuffix(returnStr, ", ") + return returnStr +} diff --git a/src/cli/worker_manager.go b/src/cli/worker_manager.go index cd20cac1..e1ec9187 100644 --- a/src/cli/worker_manager.go +++ b/src/cli/worker_manager.go @@ -490,6 +490,7 @@ func Run(gc CLIConf) { inChan := make(chan string) outChan := make(chan string) metaChan := make(chan routineMetadata, gc.Threads) + statusChan := make(chan zdns.Status) var routineWG sync.WaitGroup inHandler := gc.InputHandler @@ -509,13 +510,18 @@ func Run(gc CLIConf) { log.Fatal(fmt.Sprintf("could not feed input channel: %v", inErr)) } }() + go func() { outErr := outHandler.WriteResults(outChan, &routineWG) if outErr != nil { log.Fatal(fmt.Sprintf("could not write output results from output channel: %v", outErr)) } }() - routineWG.Add(2) + + go func() { + iohandlers.StatusHandler(statusChan, &routineWG) + }() + routineWG.Add(3) // create pool of worker goroutines var lookupWG sync.WaitGroup @@ -525,7 +531,7 @@ func Run(gc CLIConf) { for i := 0; i < gc.Threads; i++ { i := i go func(threadID int) { - initWorkerErr := doLookupWorker(&gc, resolverConfig, inChan, outChan, metaChan, &lookupWG) + initWorkerErr := doLookupWorker(&gc, resolverConfig, inChan, outChan, metaChan, statusChan, &lookupWG) if initWorkerErr != nil { log.Fatalf("could not start lookup worker #%d: %v", i, initWorkerErr) } @@ -534,6 +540,7 @@ func Run(gc CLIConf) { lookupWG.Wait() close(outChan) close(metaChan) + close(statusChan) routineWG.Wait() if gc.MetadataFilePath != "" { // we're done processing data. aggregate all the data from individual routines @@ -580,7 +587,7 @@ func Run(gc CLIConf) { } // doLookupWorker is a single worker thread that processes lookups from the input channel. It calls wg.Done when it is finished. -func doLookupWorker(gc *CLIConf, rc *zdns.ResolverConfig, inputChan <-chan string, output chan<- string, metaChan chan<- routineMetadata, wg *sync.WaitGroup) error { +func doLookupWorker(gc *CLIConf, rc *zdns.ResolverConfig, inputChan <-chan string, outputChan chan<- string, metaChan chan<- routineMetadata, statusChan chan<- zdns.Status, wg *sync.WaitGroup) error { defer wg.Done() resolver, err := zdns.InitResolver(rc) if err != nil { @@ -590,7 +597,7 @@ func doLookupWorker(gc *CLIConf, rc *zdns.ResolverConfig, inputChan <-chan strin metadata.Status = make(map[zdns.Status]int) for line := range inputChan { - handleWorkerInput(gc, rc, line, resolver, &metadata, output) + handleWorkerInput(gc, rc, line, resolver, &metadata, outputChan, statusChan) } // close the resolver, freeing up resources resolver.Close() @@ -598,7 +605,7 @@ func doLookupWorker(gc *CLIConf, rc *zdns.ResolverConfig, inputChan <-chan strin return nil } -func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolver *zdns.Resolver, metadata *routineMetadata, output chan<- string) { +func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolver *zdns.Resolver, metadata *routineMetadata, outputChan chan<- string, statusChan chan<- zdns.Status) { // we'll process each module sequentially, parallelism is per-domain res := zdns.Result{Results: make(map[string]zdns.SingleModuleResult, len(gc.ActiveModules))} // get the fields that won't change for each lookup module @@ -669,6 +676,7 @@ func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolv lookupRes.Error = err.Error() } res.Results[moduleName] = lookupRes + statusChan <- status } metadata.Status[status]++ metadata.Lookups++ @@ -689,7 +697,7 @@ func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolv if err != nil { log.Fatalf("unable to marshal JSON result: %v", err) } - output <- string(jsonRes) + outputChan <- string(jsonRes) } metadata.Names++ } From 6f10adaf2f3ddd7c6798bf65a368fe1d94218630 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Mon, 11 Nov 2024 15:06:17 -0500 Subject: [PATCH 02/12] time formatting --- src/cli/iohandlers/status_handler.go | 51 +++++++++++++++++----------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index 8f17dfac..eb6e9a50 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -16,6 +16,7 @@ package iohandlers import ( "fmt" + "sort" "strings" "sync" "time" @@ -27,14 +28,16 @@ import ( // time elapsed, domains scanned total, avg. per second, per status entry, ordered largest to smallest type scanStats struct { - scanStartTime time.Time - domainsScanned int - statusCardinality map[zdns.Status]int + scanStartTime time.Time + domainsScanned int + statusOccurance map[zdns.Status]int } // statusHandler prints a per-second update to the user scan progress and per-status statistics func StatusHandler(statusChan <-chan zdns.Status, wg *sync.WaitGroup) { - stats := scanStats{} + stats := scanStats{ + statusOccurance: make(map[zdns.Status]int), + } stats.scanStartTime = time.Now() timer := time.Tick(time.Second) statusLoop: @@ -42,12 +45,14 @@ statusLoop: select { case <-timer: // print per-second summary - scanDuration := time.Since(stats.scanStartTime) - fmt.Printf("%s; %d domains scanned; %f domains/sec.; %s\n", - scanDuration, + timeSinceStart := time.Since(stats.scanStartTime) + fmt.Printf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %s\n", + int(timeSinceStart.Hours()), + int(timeSinceStart.Minutes())%60, + int(timeSinceStart.Seconds())%60, stats.domainsScanned, - float64(stats.domainsScanned)/scanDuration.Seconds(), - getStatusOccuranceString(stats.statusCardinality)) + float64(stats.domainsScanned)/timeSinceStart.Seconds(), + getStatusOccuranceString(stats.statusOccurance)) case status, ok := <-statusChan: if !ok { // TODO check that this syntax is valid @@ -58,36 +63,42 @@ statusLoop: incrementStatus(stats, status) } } - fmt.Printf("%s; Scan Complete, no more input. %d domains scanned; %f domains/sec.; %s\n", - time.Since(stats.scanStartTime), + timeSinceStart := time.Since(stats.scanStartTime) + fmt.Printf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %s\n", + int(timeSinceStart.Hours()), + int(timeSinceStart.Minutes())%60, + int(timeSinceStart.Seconds())%60, stats.domainsScanned, float64(stats.domainsScanned)/time.Since(stats.scanStartTime).Seconds(), - getStatusOccuranceString(stats.statusCardinality)) + getStatusOccuranceString(stats.statusOccurance)) wg.Done() } func incrementStatus(stats scanStats, status zdns.Status) { - if _, ok := stats.statusCardinality[status]; !ok { - stats.statusCardinality[status] = 0 + if _, ok := stats.statusOccurance[status]; !ok { + stats.statusOccurance[status] = 0 } - stats.statusCardinality[status] += 1 + stats.statusOccurance[status] += 1 } func getStatusOccuranceString(statusOccurances map[zdns.Status]int) string { - type statusAndCardinality struct { + type statusAndOccurance struct { status zdns.Status occurance int } - statusesAndCards := make([]statusAndCardinality, 0, len(statusOccurances)) + statusesAndOccurances := make([]statusAndOccurance, 0, len(statusOccurances)) for status, occurance := range statusOccurances { - statusesAndCards = append(statusesAndCards, statusAndCardinality{ + statusesAndOccurances = append(statusesAndOccurances, statusAndOccurance{ status: status, occurance: occurance, }) } - // TODO Sort these by occurance, largest to smallest + // sort by occurance + sort.Slice(statusesAndOccurances, func(i, j int) bool { + return statusesAndOccurances[i].occurance > statusesAndOccurances[j].occurance + }) returnStr := "" - for _, statusAndOccurance := range statusesAndCards { + for _, statusAndOccurance := range statusesAndOccurances { returnStr += fmt.Sprintf("%s: %d, ", statusAndOccurance.status, statusAndOccurance.occurance) } // remove trailing comma From 877bc4ede9054ae80cf305d44e84ab32205b0660 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Mon, 11 Nov 2024 15:14:34 -0500 Subject: [PATCH 03/12] cleanup --- src/cli/iohandlers/status_handler.go | 7 +++---- src/cli/worker_manager.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index eb6e9a50..1644e322 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -37,8 +37,8 @@ type scanStats struct { func StatusHandler(statusChan <-chan zdns.Status, wg *sync.WaitGroup) { stats := scanStats{ statusOccurance: make(map[zdns.Status]int), + scanStartTime: time.Now(), } - stats.scanStartTime = time.Now() timer := time.Tick(time.Second) statusLoop: for { @@ -55,7 +55,6 @@ statusLoop: getStatusOccuranceString(stats.statusOccurance)) case status, ok := <-statusChan: if !ok { - // TODO check that this syntax is valid // status chan closed, exiting break statusLoop } @@ -98,8 +97,8 @@ func getStatusOccuranceString(statusOccurances map[zdns.Status]int) string { return statusesAndOccurances[i].occurance > statusesAndOccurances[j].occurance }) returnStr := "" - for _, statusAndOccurance := range statusesAndOccurances { - returnStr += fmt.Sprintf("%s: %d, ", statusAndOccurance.status, statusAndOccurance.occurance) + for _, statusOccurance := range statusesAndOccurances { + returnStr += fmt.Sprintf("%s: %d, ", statusOccurance.status, statusOccurance.occurance) } // remove trailing comma returnStr = strings.TrimSuffix(returnStr, ", ") diff --git a/src/cli/worker_manager.go b/src/cli/worker_manager.go index e1ec9187..b7b940fc 100644 --- a/src/cli/worker_manager.go +++ b/src/cli/worker_manager.go @@ -521,7 +521,7 @@ func Run(gc CLIConf) { go func() { iohandlers.StatusHandler(statusChan, &routineWG) }() - routineWG.Add(3) + routineWG.Add(3) // input, output, and status handlers // create pool of worker goroutines var lookupWG sync.WaitGroup From ee47d3e6c829a973dc83fbf4eaa7cd4ae87e20ee Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 10:10:32 -0500 Subject: [PATCH 04/12] fixed leaking ticker found with linter --- src/cli/iohandlers/status_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index 1644e322..ef1673bb 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -39,11 +39,11 @@ func StatusHandler(statusChan <-chan zdns.Status, wg *sync.WaitGroup) { statusOccurance: make(map[zdns.Status]int), scanStartTime: time.Now(), } - timer := time.Tick(time.Second) + ticker := time.NewTicker(time.Second) statusLoop: for { select { - case <-timer: + case <-ticker.C: // print per-second summary timeSinceStart := time.Since(stats.scanStartTime) fmt.Printf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %s\n", From 6c3e0da0c65dd594573a9aa5ef512fa088dc3642 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 10:22:47 -0500 Subject: [PATCH 05/12] sanitize out logs for integration tests --- testing/integration_tests.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/testing/integration_tests.py b/testing/integration_tests.py index 222d79ee..39857d67 100755 --- a/testing/integration_tests.py +++ b/testing/integration_tests.py @@ -45,14 +45,18 @@ class Tests(unittest.TestCase): def run_zdns_check_failure(self, flags, name, expected_err, executable=ZDNS_EXECUTABLE): flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}; exit 0" - o = subprocess.check_output(c, shell=True, stderr=subprocess.STDOUT) - self.assertEqual(expected_err in o.decode(), True) + o = subprocess.check_output(c, shell=True, stderr=subprocess.STDOUT).decode('utf-8').rstrip() + # remove per-second logs/output, only keep json-lines or fatal errors + filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}') or "fatal" in line) + self.assertEqual(expected_err in filtered_output, True) def run_zdns(self, flags, name, executable=ZDNS_EXECUTABLE): flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}" - o = subprocess.check_output(c, shell=True) - return c, json.loads(o.rstrip()) + o = subprocess.check_output(c, shell=True).decode('utf-8').rstrip() + # remove per-second logs/output, only keep json-lines + filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}')) + return c, json.loads(filtered_output) # Runs zdns with a given name(s) input and flags, returns the command and JSON objects from the piped JSON-Lines output # Used when running a ZDNS command that should return multiple lines of output, and you want those in a list @@ -60,8 +64,10 @@ def run_zdns_multiline_output(self, flags, name, executable=ZDNS_EXECUTABLE, app if append_flags: flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}" - o = subprocess.check_output(c, shell=True) - output_lines = o.decode('utf-8').strip().splitlines() + o = subprocess.check_output(c, shell=True).decode('utf-8').rstrip() + # remove per-second logs/output, only keep json-lines + filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}')) + output_lines = filtered_output.strip().splitlines() json_objects = [json.loads(line.rstrip()) for line in output_lines] return c, json_objects From c1f3561af4c806145a19f234a602529ddb62b2bb Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 10:29:35 -0500 Subject: [PATCH 06/12] remove note --- src/cli/iohandlers/status_handler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index ef1673bb..ff0224e2 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -24,9 +24,6 @@ import ( "github.com/zmap/zdns/src/zdns" ) -// Notes -// time elapsed, domains scanned total, avg. per second, per status entry, ordered largest to smallest - type scanStats struct { scanStartTime time.Time domainsScanned int From 9cb0ee2941d25077c6bd9d057fee04a341e54151 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 13:46:24 -0500 Subject: [PATCH 07/12] add --quiet and --status-updates-file to cli --- src/cli/cli.go | 6 +++ src/cli/iohandlers/file_handlers.go | 8 ++-- src/cli/iohandlers/status_handler.go | 57 +++++++++++++++++++++++++--- src/cli/worker_manager.go | 31 ++++++++++----- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/src/cli/cli.go b/src/cli/cli.go index 8f7fca7e..09034b57 100644 --- a/src/cli/cli.go +++ b/src/cli/cli.go @@ -37,6 +37,9 @@ type InputHandler interface { type OutputHandler interface { WriteResults(results <-chan string, wg *sync.WaitGroup) error } +type StatusHandler interface { + LogPeriodicUpdates(statusChan <-chan zdns.Status, wg *sync.WaitGroup) error +} // GeneralOptions core options for all ZDNS modules // Order here is the order they'll be printed to the user, so preserve alphabetical order @@ -96,9 +99,11 @@ type InputOutputOptions struct { MetadataFilePath string `long:"metadata-file" description:"where should JSON metadata be saved, defaults to no metadata output. Use '-' for stderr."` MetadataFormat bool `long:"metadata-passthrough" description:"if input records have the form 'name,METADATA', METADATA will be propagated to the output"` OutputFilePath string `short:"o" long:"output-file" default:"-" description:"where should JSON output be saved, defaults to stdout"` + Quiet bool `short:"q" long:"quiet" description:"do not print status updates"` NameOverride string `long:"override-name" description:"name overrides all passed in names. Commonly used with --name-server-mode."` NamePrefix string `long:"prefix" description:"name to be prepended to what's passed in (e.g., www.)"` ResultVerbosity string `long:"result-verbosity" default:"normal" description:"Sets verbosity of each output record. Options: short, normal, long, trace"` + StatusUpdatesFilePath string `short:"u" long:"status-updates-file" default:"-" description:"file to write scan progress to, defaults to stderr"` Verbosity int `long:"verbosity" default:"3" description:"log verbosity: 1 (lowest)--5 (highest)"` } @@ -116,6 +121,7 @@ type CLIConf struct { ClientSubnet *dns.EDNS0_SUBNET InputHandler InputHandler OutputHandler OutputHandler + StatusHandler StatusHandler CLIModule string // the module name as passed in by the user ActiveModuleNames []string // names of modules that are active in this invocation of zdns. Mostly used with MULTIPLE ActiveModules map[string]LookupModule // map of module names to modules diff --git a/src/cli/iohandlers/file_handlers.go b/src/cli/iohandlers/file_handlers.go index d89ede3e..d7970b7a 100644 --- a/src/cli/iohandlers/file_handlers.go +++ b/src/cli/iohandlers/file_handlers.go @@ -60,17 +60,17 @@ func (h *FileInputHandler) FeedChannel(in chan<- string, wg *sync.WaitGroup) err return nil } -type FileOutputHandler struct { +type St struct { filepath string } -func NewFileOutputHandler(filepath string) *FileOutputHandler { - return &FileOutputHandler{ +func NewFileOutputHandler(filepath string) *St { + return &St{ filepath: filepath, } } -func (h *FileOutputHandler) WriteResults(results <-chan string, wg *sync.WaitGroup) error { +func (h *St) WriteResults(results <-chan string, wg *sync.WaitGroup) error { defer (*wg).Done() var f *os.File diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index ff0224e2..6e5b8768 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -16,22 +16,63 @@ package iohandlers import ( "fmt" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "os" "sort" "strings" "sync" "time" + "github.com/zmap/zdns/src/internal/util" "github.com/zmap/zdns/src/zdns" ) +type StatusHandler struct { + filePath string +} + type scanStats struct { scanStartTime time.Time domainsScanned int statusOccurance map[zdns.Status]int } -// statusHandler prints a per-second update to the user scan progress and per-status statistics -func StatusHandler(statusChan <-chan zdns.Status, wg *sync.WaitGroup) { +func NewStatusHandler(filePath string) *StatusHandler { + return &StatusHandler{ + filePath: filePath, + } +} + +// LogPeriodicUpdates prints a per-second update to the user scan progress and per-status statistics +func (h *StatusHandler) LogPeriodicUpdates(statusChan <-chan zdns.Status, wg *sync.WaitGroup) error { + defer wg.Done() + // open file for writing + var f *os.File + if h.filePath == "" || h.filePath == "-" { + f = os.Stderr + } else { + // open file for writing + var err error + f, err = os.OpenFile(h.filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, util.DefaultFilePermissions) + if err != nil { + return errors.Wrap(err, "unable to open status file") + } + defer func(f *os.File) { + if err := f.Close(); err != nil { + log.Errorf("unable to close status file: %v", err) + } + }(f) + } + if err := h.statusLoop(statusChan, f); err != nil { + return errors.Wrap(err, "error encountered in status loop") + } + return nil +} + +// statusLoop will print a per-second summary of the scan progress and per-status statistics +func (h *StatusHandler) statusLoop(statusChan <-chan zdns.Status, statusFile *os.File) error { + // initialize stats stats := scanStats{ statusOccurance: make(map[zdns.Status]int), scanStartTime: time.Now(), @@ -43,13 +84,16 @@ statusLoop: case <-ticker.C: // print per-second summary timeSinceStart := time.Since(stats.scanStartTime) - fmt.Printf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60, stats.domainsScanned, float64(stats.domainsScanned)/timeSinceStart.Seconds(), getStatusOccuranceString(stats.statusOccurance)) + if _, err := statusFile.WriteString(s); err != nil { + return errors.Wrap(err, "unable to write periodic status update") + } case status, ok := <-statusChan: if !ok { // status chan closed, exiting @@ -60,14 +104,17 @@ statusLoop: } } timeSinceStart := time.Since(stats.scanStartTime) - fmt.Printf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60, stats.domainsScanned, float64(stats.domainsScanned)/time.Since(stats.scanStartTime).Seconds(), getStatusOccuranceString(stats.statusOccurance)) - wg.Done() + if _, err := statusFile.WriteString(s); err != nil { + return errors.Wrap(err, "unable to write final status update") + } + return nil } func incrementStatus(stats scanStats, status zdns.Status) { diff --git a/src/cli/worker_manager.go b/src/cli/worker_manager.go index b7b940fc..6cd0e1be 100644 --- a/src/cli/worker_manager.go +++ b/src/cli/worker_manager.go @@ -162,6 +162,9 @@ func populateCLIConfig(gc *CLIConf) *CLIConf { if gc.OutputHandler == nil { gc.OutputHandler = iohandlers.NewFileOutputHandler(gc.OutputFilePath) } + if gc.StatusHandler == nil { + gc.StatusHandler = iohandlers.NewStatusHandler(gc.StatusUpdatesFilePath) + } return gc } @@ -503,25 +506,33 @@ func Run(gc CLIConf) { log.Fatal("Output handler is nil") } + statusHandler := gc.StatusHandler + if statusHandler == nil { + log.Fatal("Status handler is nil") + } + // Use handlers to populate the input and output/results channel go func() { - inErr := inHandler.FeedChannel(inChan, &routineWG) - if inErr != nil { + if inErr := inHandler.FeedChannel(inChan, &routineWG); inErr != nil { log.Fatal(fmt.Sprintf("could not feed input channel: %v", inErr)) } }() go func() { - outErr := outHandler.WriteResults(outChan, &routineWG) - if outErr != nil { + if outErr := outHandler.WriteResults(outChan, &routineWG); outErr != nil { log.Fatal(fmt.Sprintf("could not write output results from output channel: %v", outErr)) } }() + routineWG.Add(2) // input and output handlers - go func() { - iohandlers.StatusHandler(statusChan, &routineWG) - }() - routineWG.Add(3) // input, output, and status handlers + if !gc.Quiet { + go func() { + if statusErr := statusHandler.LogPeriodicUpdates(statusChan, &routineWG); statusErr != nil { + log.Fatal(fmt.Sprintf("could not log periodic status updates: %v", statusErr)) + } + }() + routineWG.Add(1) // status handler + } // create pool of worker goroutines var lookupWG sync.WaitGroup @@ -676,7 +687,9 @@ func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolv lookupRes.Error = err.Error() } res.Results[moduleName] = lookupRes - statusChan <- status + if !gc.Quiet { + statusChan <- status + } } metadata.Status[status]++ metadata.Lookups++ From 43f20403da7ff9343c0e7b550c4fa500d4b57c5d Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 14:13:37 -0500 Subject: [PATCH 08/12] revert testing change, just use --quiet flag --- testing/integration_tests.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/testing/integration_tests.py b/testing/integration_tests.py index 39857d67..07ef85a2 100755 --- a/testing/integration_tests.py +++ b/testing/integration_tests.py @@ -40,23 +40,19 @@ def dictSort(d): class Tests(unittest.TestCase): maxDiff = None ZDNS_EXECUTABLE = "./zdns" - ADDITIONAL_FLAGS = " --threads=10" # flags used with every test + ADDITIONAL_FLAGS = " --threads=10 --quiet" # flags used with every test def run_zdns_check_failure(self, flags, name, expected_err, executable=ZDNS_EXECUTABLE): flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}; exit 0" - o = subprocess.check_output(c, shell=True, stderr=subprocess.STDOUT).decode('utf-8').rstrip() - # remove per-second logs/output, only keep json-lines or fatal errors - filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}') or "fatal" in line) - self.assertEqual(expected_err in filtered_output, True) + o = subprocess.check_output(c, shell=True, stderr=subprocess.STDOUT) + self.assertEqual(expected_err in o.decode(), True) def run_zdns(self, flags, name, executable=ZDNS_EXECUTABLE): flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}" - o = subprocess.check_output(c, shell=True).decode('utf-8').rstrip() - # remove per-second logs/output, only keep json-lines - filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}')) - return c, json.loads(filtered_output) + o = subprocess.check_output(c, shell=True) + return c, json.loads(o.rstrip()) # Runs zdns with a given name(s) input and flags, returns the command and JSON objects from the piped JSON-Lines output # Used when running a ZDNS command that should return multiple lines of output, and you want those in a list @@ -64,10 +60,8 @@ def run_zdns_multiline_output(self, flags, name, executable=ZDNS_EXECUTABLE, app if append_flags: flags = flags + self.ADDITIONAL_FLAGS c = f"echo '{name}' | {executable} {flags}" - o = subprocess.check_output(c, shell=True).decode('utf-8').rstrip() - # remove per-second logs/output, only keep json-lines - filtered_output = "\n".join(line for line in o.splitlines() if line.startswith('{') and line.endswith('}')) - output_lines = filtered_output.strip().splitlines() + o = subprocess.check_output(c, shell=True) + output_lines = o.decode('utf-8').strip().splitlines() json_objects = [json.loads(line.rstrip()) for line in output_lines] return c, json_objects From 83d7fa46d2934a83914dbc54673b708083e97002 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 14:22:48 -0500 Subject: [PATCH 09/12] add success rate to status handler --- src/cli/iohandlers/status_handler.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index 6e5b8768..608512db 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -16,14 +16,15 @@ package iohandlers import ( "fmt" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "os" "sort" "strings" "sync" "time" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/zmap/zdns/src/internal/util" "github.com/zmap/zdns/src/zdns" ) @@ -35,6 +36,7 @@ type StatusHandler struct { type scanStats struct { scanStartTime time.Time domainsScanned int + domainsSuccess int // number of domains that returned either NXDOMAIN or NOERROR statusOccurance map[zdns.Status]int } @@ -84,12 +86,13 @@ statusLoop: case <-ticker.C: // print per-second summary timeSinceStart := time.Since(stats.scanStartTime) - s := fmt.Sprintf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %.01f%% success rate; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60, stats.domainsScanned, float64(stats.domainsScanned)/timeSinceStart.Seconds(), + float64(stats.domainsSuccess*100)/float64(stats.domainsScanned), getStatusOccuranceString(stats.statusOccurance)) if _, err := statusFile.WriteString(s); err != nil { return errors.Wrap(err, "unable to write periodic status update") @@ -100,16 +103,24 @@ statusLoop: break statusLoop } stats.domainsScanned += 1 - incrementStatus(stats, status) + if status == zdns.StatusNoError || status == zdns.StatusNXDomain { + stats.domainsSuccess += 1 + } + if _, ok = stats.statusOccurance[status]; !ok { + // initialize status if not seen before + stats.statusOccurance[status] = 0 + } + stats.statusOccurance[status] += 1 } } timeSinceStart := time.Since(stats.scanStartTime) - s := fmt.Sprintf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %.01f%% success rate; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60, stats.domainsScanned, float64(stats.domainsScanned)/time.Since(stats.scanStartTime).Seconds(), + float64(stats.domainsSuccess*100)/float64(stats.domainsScanned), getStatusOccuranceString(stats.statusOccurance)) if _, err := statusFile.WriteString(s); err != nil { return errors.Wrap(err, "unable to write final status update") @@ -117,13 +128,6 @@ statusLoop: return nil } -func incrementStatus(stats scanStats, status zdns.Status) { - if _, ok := stats.statusOccurance[status]; !ok { - stats.statusOccurance[status] = 0 - } - stats.statusOccurance[status] += 1 -} - func getStatusOccuranceString(statusOccurances map[zdns.Status]int) string { type statusAndOccurance struct { status zdns.Status From 2de3e780753d5f1b9005e10511571f7c6400f71a Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 14:30:46 -0500 Subject: [PATCH 10/12] pr review, remove rename of file handler --- src/cli/iohandlers/file_handlers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cli/iohandlers/file_handlers.go b/src/cli/iohandlers/file_handlers.go index d7970b7a..d89ede3e 100644 --- a/src/cli/iohandlers/file_handlers.go +++ b/src/cli/iohandlers/file_handlers.go @@ -60,17 +60,17 @@ func (h *FileInputHandler) FeedChannel(in chan<- string, wg *sync.WaitGroup) err return nil } -type St struct { +type FileOutputHandler struct { filepath string } -func NewFileOutputHandler(filepath string) *St { - return &St{ +func NewFileOutputHandler(filepath string) *FileOutputHandler { + return &FileOutputHandler{ filepath: filepath, } } -func (h *St) WriteResults(results <-chan string, wg *sync.WaitGroup) error { +func (h *FileOutputHandler) WriteResults(results <-chan string, wg *sync.WaitGroup) error { defer (*wg).Done() var f *os.File From f2aa104b14a08ca46c23d278949d325a49c9381e Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Tue, 12 Nov 2024 14:31:05 -0500 Subject: [PATCH 11/12] rename Quiet to QuietStatus, more expressive --- src/cli/cli.go | 2 +- src/cli/worker_manager.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cli/cli.go b/src/cli/cli.go index 09034b57..1bf5b58e 100644 --- a/src/cli/cli.go +++ b/src/cli/cli.go @@ -99,7 +99,7 @@ type InputOutputOptions struct { MetadataFilePath string `long:"metadata-file" description:"where should JSON metadata be saved, defaults to no metadata output. Use '-' for stderr."` MetadataFormat bool `long:"metadata-passthrough" description:"if input records have the form 'name,METADATA', METADATA will be propagated to the output"` OutputFilePath string `short:"o" long:"output-file" default:"-" description:"where should JSON output be saved, defaults to stdout"` - Quiet bool `short:"q" long:"quiet" description:"do not print status updates"` + QuietStatusUpdates bool `short:"q" long:"quiet" description:"do not print status updates"` NameOverride string `long:"override-name" description:"name overrides all passed in names. Commonly used with --name-server-mode."` NamePrefix string `long:"prefix" description:"name to be prepended to what's passed in (e.g., www.)"` ResultVerbosity string `long:"result-verbosity" default:"normal" description:"Sets verbosity of each output record. Options: short, normal, long, trace"` diff --git a/src/cli/worker_manager.go b/src/cli/worker_manager.go index 6cd0e1be..5bb0c54c 100644 --- a/src/cli/worker_manager.go +++ b/src/cli/worker_manager.go @@ -525,7 +525,7 @@ func Run(gc CLIConf) { }() routineWG.Add(2) // input and output handlers - if !gc.Quiet { + if !gc.QuietStatusUpdates { go func() { if statusErr := statusHandler.LogPeriodicUpdates(statusChan, &routineWG); statusErr != nil { log.Fatal(fmt.Sprintf("could not log periodic status updates: %v", statusErr)) @@ -687,7 +687,7 @@ func handleWorkerInput(gc *CLIConf, rc *zdns.ResolverConfig, line string, resolv lookupRes.Error = err.Error() } res.Results[moduleName] = lookupRes - if !gc.Quiet { + if !gc.QuietStatusUpdates { statusChan <- status } } From b0a64fd92dca98e0b3810156bbadd35304181c96 Mon Sep 17 00:00:00 2001 From: phillip-stephens Date: Fri, 22 Nov 2024 12:49:12 -0500 Subject: [PATCH 12/12] PR feedback, cleanup output --- src/cli/iohandlers/status_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cli/iohandlers/status_handler.go b/src/cli/iohandlers/status_handler.go index 608512db..81719ffe 100644 --- a/src/cli/iohandlers/status_handler.go +++ b/src/cli/iohandlers/status_handler.go @@ -86,7 +86,7 @@ statusLoop: case <-ticker.C: // print per-second summary timeSinceStart := time.Since(stats.scanStartTime) - s := fmt.Sprintf("%02dh:%02dm:%02ds; %d domains scanned; %.02f domains/sec.; %.01f%% success rate; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; %d names scanned; %.02f names/sec; %.01f%% success rate; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60, @@ -114,7 +114,7 @@ statusLoop: } } timeSinceStart := time.Since(stats.scanStartTime) - s := fmt.Sprintf("%02dh:%02dm:%02ds; Scan Complete, no more input. %d domains scanned; %.02f domains/sec.; %.01f%% success rate; %s\n", + s := fmt.Sprintf("%02dh:%02dm:%02ds; Scan Complete; %d names scanned; %.02f names/sec; %.01f%% success rate; %s\n", int(timeSinceStart.Hours()), int(timeSinceStart.Minutes())%60, int(timeSinceStart.Seconds())%60,