TYPE_A = b"\x00\x01" TYPE_NS = b"\x00\x02" TYPE_CNAME = b"\x00\x05" CLASS_IN = b"\x00\x01" def encode_question_entry(encoded_name: bytes, type: bytes, clazz: bytes) -> bytes: """Encode entry question section""" return encoded_name + type + clazz def encode_response_entry(encoded_name: bytes, type: bytes, clazz: bytes, ttl: int, rdata: bytes): """Encode entry response section""" return (encoded_name + type + clazz + ttl.to_bytes(length=4, byteorder="big") + len(rdata).to_bytes(length=2, byteorder="big") + rdata) def encode_name_pointer(offset_since_start: int): """Encode name pointer""" # Note: this wrongly assumes that offset <= 255 return bytes((0xC0, offset_since_start)) def encode_name(name: bytes) -> bytes: """a domain name represented as a sequence of labels, where each label consists of a length octet followed by that number of octets.""" # https://datatracker.ietf.org/doc/html/rfc1035#section-4.1.2 encoded = b"" for component in name.split(b'.'): # Chaque composant est précédé par sa longueur en octet encodé sur 1 octet encoded += len(component).to_bytes(length=1, signed=False) # Contenu encoded += component # Fin du nom encoded += b'\x00' return encoded def encode_header(response: bool, opcode: int, authoritative: bool, truncated: bool, rec_d: bool, rec_a: bool, rcode: int, questions: int, answers: int, authorities: int, additional: int): flags = [0, 0] if response: flags[0] |= 0x80 flags[0] |= (opcode & 0x0F) << 3 if authoritative: flags[0] |= 0x04 if truncated: flags[0] |= 0x02 if rec_d: flags[0] |= 0x01 if rec_a: flags[1] |= 0x80 flags[1] |= rcode & 0x0F # header = tid.to_bytes(length=2, byteorder="big") header = bytes(flags) header += questions.to_bytes(length=2, byteorder="big") header += answers.to_bytes(length=2, byteorder="big") header += authorities.to_bytes(length=2, byteorder="big") header += additional.to_bytes(length=2, byteorder="big") return header class DNSServer: """Represent a DNS Server""" def __init__(self, ip: str, port: int, query_port: int): self._ip: str = ip self._port: int = port self._query_port: int = query_port @property def ip(self) -> str: return self._ip @property def port(self) -> int: return self._port @property def query_port(self) -> int: return self._query_port