diff --git a/tools/sort_lists.py b/tools/sort_lists.py index b7bfc42f6bd..f007541da70 100644 --- a/tools/sort_lists.py +++ b/tools/sort_lists.py @@ -25,10 +25,10 @@ def find_files_by_name(directory, filenames): def get_modified_files_in_last_commit(): try: - output = check_output(["git", "diff", "--name-only", "HEAD~2", "HEAD"]).decode().splitlines() + output = check_output(["git", "diff", "--name-only", "HEAD~1", "HEAD"]).decode().splitlines() except CalledProcessError: - # Fallback to HEAD if there are not enough commits - output = check_output(["git", "diff", "--name-only", "HEAD"]).decode().splitlines() + # If there's only one commit, use `git ls-files` instead + output = check_output(["git", "ls-files"]).decode().splitlines() return output def fetch_valid_tlds(proxy): @@ -103,231 +103,4 @@ def remove_duplicates(lines): def validate_idna_domain(domain): try: - # Attempt to encode to IDNA - domain_idna = idna.encode(domain).decode('utf-8') - return domain_idna - except Exception as e: - print(f"IDNA encoding error for domain {domain}: {e}") - return None - -def test_domain_connectivity(domain, proxy): - proxies = {"http": proxy, "https": proxy} if proxy else None - try: - response = requests.get(f"http://{domain}", timeout=5, proxies=proxies) - if response.status_code == 200: - return True - except requests.RequestException as e: - print(f"Connectivity test error for domain {domain}: {e}") - return False - -def dns_lookup(domain): - resolver = dns.resolver.Resolver() - resolver.nameservers = ['9.9.9.10'] # Quad9 DNS - try: - resolver.resolve(domain) - return True - except (dns.resolver.NXDOMAIN, dns.resolver.Timeout, dns.exception.DNSException) as e: - print(f"DNS lookup error for domain {domain}: {e}") - return False - -def sort_file_alphanum(file_path, valid_tlds, proxy): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line.rstrip('\n') for line in lines[1:] if line.strip()] # Remove empty lines and skip header - - lines = sorted(lines, key=lambda x: x.strip().split(',')[0] if ',' in x else '') # Sort FQDNs - - invalid_entries = [] - for line in lines: - domain_part = line.strip().split(',')[0] - if domain_part != "domain" and not (is_valid_domain(domain_part, valid_tlds) or domain_part in valid_tlds): - domain_idna = validate_idna_domain(domain_part) - if domain_idna is None or not test_domain_connectivity(domain_idna, proxy) or not dns_lookup(domain_idna): - invalid_entries.append(line) - - if invalid_entries: - print(f"Invalid DNS entries in {file_path}:") - for entry in invalid_entries: - print(entry.strip()) - - with open(file_path, 'w') as file: - if header: - file.write(header) - file.write('\n'.join(lines)) - file.write('\n') # Ensure a newline at the end of the file - - -def sort_file_tld(file_path, valid_tlds, proxy): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line for line in lines[1:] if line.strip()] # Remove empty lines and skip header if present - lines = sorted(lines, key=lambda x: x.strip()) # Sort TLDs - - invalid_entries = [] - for line in lines: - domain_part = line.strip().split(',')[0] - if domain_part != "domain" and not (is_valid_domain(domain_part, valid_tlds) or domain_part in valid_tlds): - domain_idna = validate_idna_domain(domain_part) - if domain_idna is None or not test_domain_connectivity(domain_idna, proxy) or not dns_lookup(domain_idna): - invalid_entries.append(line) - - if invalid_entries: - print(f"Invalid TLD entries in {file_path}:") - for entry in invalid_entries: - print(entry.strip()) - -def sort_file_rpz_nsdname(file_path, valid_tlds, proxy): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line for line in lines[1:] if line.strip()] # Remove empty lines and skip header if present - lines = sorted(lines, key=lambda x: x.strip().split(',')[0] if ',' in x else '') # Sort FQDNs - - invalid_entries = [] - for line in lines: - domain_part = line.strip().split(',')[0] - if domain_part != "domain" and not (is_valid_domain(domain_part, valid_tlds) or domain_part in valid_tlds): - domain_idna = validate_idna_domain(domain_part) - if domain_idna is None or not test_domain_connectivity(domain_idna, proxy) or not dns_lookup(domain_idna): - invalid_entries.append(line) - - if invalid_entries: - print(f"Invalid entries in {file_path}:") - for entry in invalid_entries: - print(entry.strip()) - -def sort_file_hierarchical(file_path, valid_tlds, proxy): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line for line in lines[1:] if line.strip()] # Remove empty lines and skip header if present - lines = sorted(lines, key=lambda x: (x.strip().split(',')[0], x.strip().split(',')[1] if ',' in x and len(x.strip().split(',')) > 1 else '')) # Sort FQDNs and CIDR - - invalid_entries = [] - for line in lines: - parts = line.strip().split(',') - if len(parts) > 1: - domain, ip_arpa = parts[0], parts[1] - if domain != "domain" and (not is_valid_domain(domain, valid_tlds) and not is_valid_ip_arpa(ip_arpa)): - domain_idna = validate_idna_domain(domain) - if domain_idna is None or not test_domain_connectivity(domain_idna, proxy) or not dns_lookup(domain_idna): - invalid_entries.append(line) - else: - domain = parts[0] - if domain != "domain" and not is_valid_domain(domain, valid_tlds): - domain_idna = validate_idna_domain(domain) - if domain_idna is None or not test_domain_connectivity(domain_idna, proxy) or not dns_lookup(domain_idna): - invalid_entries.append(line) - - if invalid_entries: - print(f"Invalid DNS or IP entries in {file_path}:") - for entry in invalid_entries: - print(entry.strip()) - -def sort_file_onion(file_path, valid_tlds): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line for line in lines[1:] if line.strip()] # Remove empty lines and skip header if present - lines = sorted(lines, key=lambda x: x.strip().split(',')[0] if ',' in x else '') # Sort FQDNs - - invalid_entries = [line for line in lines if line.strip().split(',')[0] != "domain" and not line.strip().endswith('.onion')] - if invalid_entries: - print(f"Invalid .onion entries in {file_path}:") - for entry in invalid_entries: - print(entry.strip()) - -def sort_file_ip(file_path): - with open(file_path, 'r') as file: - lines = file.readlines() - - lines = remove_duplicates(lines) # Remove duplicate lines - - header = lines[0] if lines else "" - lines = [line for line in lines[1:] if line.strip()] # Remove empty lines and skip header if present - lines = sorted(lines) # Sort IPs directly - - with open(file_path, 'w') as file: - if header: - file.write(header) - file.writelines(lines) - file.write("") # Ensure no additional newline - -def main(): - parser = argparse.ArgumentParser(description="Sort and clean CSV files.") - parser.add_argument('-v', '--version', action='version', version=f"%(prog)s {VERSION}") - parser.add_argument('-f', '--force', action='store_true', help="Force run on all files, altered or not") - parser.add_argument('-x', '--proxy', type=str, default=None, help="Specify a proxy to use for downloading external files") - parser.add_argument('--no-proxy', action='store_true', help="Disable the default proxy setting") - parser.add_argument('-d', '-s', '--donate', '--sponsor', action='store_true', help="Open the donate link in default browser") - args = parser.parse_args() - - proxy = args.proxy - if not args.no_proxy and not proxy and "GITHUB_ACTIONS" not in os.environ: - proxy = "socks5h://localhost:9050" - - if args.donate: - webbrowser.open('https://www.mypdns.org/donate') - sys.exit(0) - - valid_tlds = fetch_valid_tlds(proxy) - - alphanum_filenames = ["wildcard.csv", "mobile.csv", "snuff.csv"] - tld_filenames = ["tld.csv"] - rpz_nsdname_filenames = ["wildcard.rpz-nsdname.csv", "domains.rpz-nsdname.csv"] - hierarchical_filenames = ["domains.csv", "onions.csv"] - ip_filenames = ["rpz-ip.csv", "ip4.csv", "rpz-client-ip.csv", "rpz-drop.csv", "ip6.csv"] - - modified_files = get_modified_files_in_last_commit() - target_files_alphanum = find_files_by_name("source", alphanum_filenames) - target_files_tld = find_files_by_name("source", tld_filenames) - target_files_rpz_nsdname = find_files_by_name("source", rpz_nsdname_filenames) - target_files_hierarchical = find_files_by_name("source", hierarchical_filenames) - target_files_onion = find_files_by_name("source", ["onions.csv"]) - target_files_ip = find_files_by_name("source", ip_filenames) - - for file in target_files_alphanum: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_alphanum(file, valid_tlds, proxy) - - for file in target_files_tld: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_tld(file, valid_tlds, proxy) - - for file in target_files_rpz_nsdname: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_rpz_nsdname(file, valid_tlds, proxy) - - for file in target_files_hierarchical: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_hierarchical(file, valid_tlds, proxy) - - for file in target_files_onion: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_onion(file, valid_tlds) - - for file in target_files_ip: - if args.force or any(file.endswith(modified) for modified in modified_files): - sort_file_ip(file) - - print("Please consider sponsoring My Privacy DNS at https://www.mypdns.org/donate") - -if __name__ == "__main__": - main() + #