| #!/usr/bin/python |
| |
| import logging, socket, threading, time |
| |
| class DNSQuery: |
| TYPE = '\x00\x01' # An 'A' record |
| CLASS = '\x00\x01' # of the Internet class |
| TTL = '\x00\x00\x00\x3c' # 60 hops |
| RESOURCE_DATA_LENGTH = '\x00\x04' # 4 bytes |
| |
| |
| def __init__(self, data): |
| self.data=data |
| self.dominio='' |
| |
| tipo = (ord(data[2]) >> 3) & 15 # Opcode bits |
| if tipo == 0: # Standard query |
| ini=12 |
| lon=ord(data[ini]) |
| while lon != 0: |
| self.dominio+=data[ini+1:ini+lon+1]+'.' |
| ini+=lon+1 |
| lon=ord(data[ini]) |
| |
| |
| def respuesta(self, ip): |
| packet='' |
| if self.dominio: |
| packet+=self.data[:2] + "\x81\x80" |
| |
| # Query and Answers Counts |
| packet+=self.data[4:6] + self.data[4:6] + '\x00\x00\x00\x00' |
| |
| # Original Domain Name Query |
| packet+=self.data[12:] |
| |
| # Pointer to domain name |
| packet+='\xc0\x0c' |
| |
| # Response type, ttl and resource data length -> 4 bytes |
| packet+=('%s%s%s%s' % (self.TYPE, |
| self.CLASS, |
| self.TTL, |
| self.RESOURCE_DATA_LENGTH)) |
| |
| # 4bytes of IP |
| packet+=str.join('',map(lambda x: chr(int(x)), ip.split('.'))) |
| |
| return packet |
| |
| class DNSServer: |
| """A very, very simple DNS responder. |
| |
| DNSServer will bind to port |port| on |server_address| and respond |
| to any query with an A record pointing to |fake_ip|. |
| |
| Must be run with superuser privilege to bind to port 53 |
| """ |
| def __init__(self, server_address='', fake_ip='192.168.1.1', port=53): |
| self._udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| self._address = server_address |
| self._port = port |
| self._fake_ip = fake_ip |
| |
| |
| def run(self, stop): |
| """Runs DNSServer until the threading.Event object |stop| gets set. |
| |
| Example use: |
| server = DNSServer(fake_ip='127.0.0.1') |
| stopper = threading.Event() |
| thread = threading.Thread(target=server.run, args=(stopper,)) |
| . |
| . |
| . |
| stopper.set() |
| """ |
| self._udps.bind((self._address, self._port)) |
| logging.info('miniFakeDns listening on port %d' % self._port) |
| self._udps.settimeout(1) |
| while not stop.is_set(): |
| try: |
| data, addr = self._udps.recvfrom(1500) |
| p=DNSQuery(data) |
| self._udps.sendto(p.respuesta(self._fake_ip), addr) |
| logging.debug('Response to %s: %s -> %s' % (addr, |
| p.dominio, |
| self._fake_ip)) |
| except socket.timeout: |
| pass |
| |
| self._udps.close() |
| |
| |
| if __name__ == '__main__': |
| ip='192.168.1.1' |
| print 'pyminifakeDNS:: dom.query. 60 IN A %s' % ip |
| server = DNSServer(fake_ip=ip) |
| stopper = threading.Event() |
| thread = threading.Thread(target=server.run, args=(stopper,)) |
| try: |
| thread.start() |
| while 1: |
| time.sleep(1) |
| except KeyboardInterrupt: |
| print 'stopping thread' |
| stopper.set() |
| thread.join() |