diff --git a/pipeline/metadata/satellite.py b/pipeline/metadata/satellite.py index c6655f11..73ad7a65 100644 --- a/pipeline/metadata/satellite.py +++ b/pipeline/metadata/satellite.py @@ -337,8 +337,10 @@ def _flat_rows_controls(key: Any, value: Row) -> Iterator[Tuple[Row, int]]: # p # PCollection[Tuple[Tuple[str, str], Row]], PCollection[Tuple[Tuple[str, str], Row]] rows, controls = ( rows | 'key by dates and domains' >> beam.Map(lambda row: ( - (row['date'], row['domain']), row)) | 'partition test and control' >> - beam.Partition(lambda row, p: int(row[1]['anomaly'] is None), 2)) + (row['date'], row['domain']), row)) | + 'partition test and control' >> beam.Partition( + lambda row, p: int(row[1]['is_control_ip'] or row[1]['anomaly'] is + None), 2)) # PCollection[Tuple[Tuple[str, str], int]] num_ctags = controls | 'calculate # control tags' >> beam.MapTuple( @@ -551,7 +553,7 @@ def _calculate_confidence(scan: Dict[str, Any], scan['average_confidence'] = sum(scan['matches_confidence']) / len( scan['matches_confidence']) # Sanity check for untagged responses: do not claim interference - if scan['untagged_response'] or scan['untagged_controls']: + if scan['untagged_response']: scan['anomaly'] = False return scan diff --git a/pipeline/metadata/test_satellite.py b/pipeline/metadata/test_satellite.py index 87c701fc..0d3a5d15 100644 --- a/pipeline/metadata/test_satellite.py +++ b/pipeline/metadata/test_satellite.py @@ -744,6 +744,251 @@ def test_verify(self) -> None: self.assertListEqual(result, expected) + def test_postprocessing_satellite_v2_2(self) -> None: # pylint: disable=no-self-use + """Test postprocessing on Satellite v2.2 data.""" + # yapf: disable + data = [ + { + "domain": "1337x.to", + "is_control": False, + "category": "Media sharing", + "ip": "8.8.8.8", + "is_control_ip": True, + "country": "US", + "date": "2022-01-02", + "start_time": "2022-01-02T14:47:22.608859091-05:00", + "end_time": "2022-01-02T14:47:22.987814778-05:00", + "error": None, + "anomaly": False, + "success": True, + "source": "CP_Satellite-2022-01-02-12-00-01", + "controls_failed": False, + "rcode": [ + "0" + ], + "average_confidence": 100, + "matches_confidence": [ + 100, + 100 + ], + "untagged_controls": False, + "untagged_response": False, + "excluded": False, + "exclude_reason": "", + "has_type_a": True, + "received": [ + { + "ip": "104.31.16.11", + "http": "ecd1a8f3bd8db93d2d69e957cd3a114b43e8ba452d5cb2239f8eb6f6b92574ab", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + }, + { + "ip": "104.31.16.118", + "http": "7255d6747fcfdc1c16a30c0da7f039571d8a1bdefe2f56fa0ca243fc684fbbb8", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + } + ], + }, + { + "domain": "1337x.to", + "is_control": False, + "category": "Media sharing", + "ip": "8.8.4.4", + "is_control_ip": True, + "country": "US", + "date": "2022-01-02", + "start_time": "2022-01-02T14:47:22.609624461-05:00", + "end_time": "2022-01-02T14:47:22.98110208-05:00", + "error": None, + "anomaly": False, + "success": True, + "source": "CP_Satellite-2022-01-02-12-00-01", + "controls_failed": True, + "rcode": [ + "-1", + "0", + "-1" + ], + "average_confidence": 0, + "matches_confidence": None, + "untagged_controls": False, + "untagged_response": False, + "excluded": False, + "exclude_reason": "", + "has_type_a": True, + "received": [ + { + "ip": "104.31.16.11", + "http": "ecd1a8f3bd8db93d2d69e957cd3a114b43e8ba452d5cb2239f8eb6f6b92574ab", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "" + }, + { + "ip": "104.31.16.118", + "http": "7255d6747fcfdc1c16a30c0da7f039571d8a1bdefe2f56fa0ca243fc684fbbb8", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "" + } + ], + }, + { + "domain": "1337x.to", + "is_control": False, + "category": "Media sharing", + "ip": "64.6.64.6", + "is_control_ip": True, + "country": "US", + "date": "2022-01-02", + "start_time": "2022-01-02T16:41:54.579216934-05:00", + "end_time": "2022-01-02T16:41:54.617330171-05:00", + "error": None, + "anomaly": False, + "success": True, + "source": "CP_Satellite-2022-01-02-12-00-01", + "controls_failed": False, + "rcode": [ + "0" + ], + "average_confidence": 100, + "matches_confidence": [ + 100, + 100 + ], + "untagged_controls": False, + "untagged_response": False, + "excluded": False, + "exclude_reason": "", + "has_type_a": True, + "received": [ + { + "ip": "104.31.16.11", + "http": "ecd1a8f3bd8db93d2d69e957cd3a114b43e8ba452d5cb2239f8eb6f6b92574ab", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + }, + { + "ip": "104.31.16.118", + "http": "7255d6747fcfdc1c16a30c0da7f039571d8a1bdefe2f56fa0ca243fc684fbbb8", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + } + ], + }, + { + "domain": "1337x.to", + "is_control": False, + "category": "Media sharing", + "ip": "64.6.65.6", + "is_control_ip": True, + "country": "US", + "date": "2022-01-02", + "start_time": "2022-01-02T15:08:04.399147076-05:00", + "end_time": "2022-01-02T15:08:04.437950734-05:00", + "error": None, + "anomaly": False, + "success": True, + "source": "CP_Satellite-2022-01-02-12-00-01", + "controls_failed": False, + "rcode": [ + "0" + ], + "average_confidence": 100, + "matches_confidence": [ + 100, + 100 + ], + "untagged_controls": False, + "untagged_response": False, + "excluded": False, + "exclude_reason": "", + "has_type_a": True, + "received": [ + { + "ip": "104.31.16.11", + "http": "ecd1a8f3bd8db93d2d69e957cd3a114b43e8ba452d5cb2239f8eb6f6b92574ab", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + }, + { + "ip": "104.31.16.118", + "http": "7255d6747fcfdc1c16a30c0da7f039571d8a1bdefe2f56fa0ca243fc684fbbb8", + "cert": "", + "asname": "CLOUDFLARENET", + "asnum": 13335, + "matches_control": "ip http asnum asname" + } + ], + }, + { + "domain": "1337x.to", + "is_control": False, + "category": "Media sharing", + "ip": "77.247.174.150", + "is_control_ip": False, + "country": "RU", + "date": "2022-01-02", + "start_time": "2022-01-02T14:47:22.708705995-05:00", + "end_time": "2022-01-02T14:47:22.983863812-05:00", + "error": None, + "anomaly": True, + "success": True, + "source": "CP_Satellite-2022-01-02-12-00-01", + "controls_failed": False, + "rcode": [ + "0" + ], + "average_confidence": 0, + "matches_confidence": [ + 0 + ], + "untagged_controls": False, + "untagged_response": False, + "excluded": False, + "exclude_reason": "", + "has_type_a": True, + "received": [ + { + "ip": "188.186.157.49", + "http": "177a8341782a57778766a7334d3e99ecb61ce54bbcc48838ddda846ea076726d", + "cert": "", + "asname": "ERTELECOM-DC-AS", + "asnum": 31483, + "matches_control": "" + } + ], + } + ] + # yapf: enable + + # Data contains 4 measurements to control resolvers and + # 1 measurement to a test resolver for the same domain. + # v2.2 already has the confidence and verification fields in the raw data and + # here both the control resolver and test resolver measurements have tags, + # so we expect that postprocessing will not change the data + # (confidence, verification, or the anomaly field). + + with TestPipeline() as p: + rows = p | 'create data' >> beam.Create(data) + + final = satellite.post_processing_satellite(rows) + beam_test_util.assert_that(final, beam_test_util.equal_to(data)) + # pylint: enable=protected-access