blob: 38538039f3420cb1e690dcd7c97b01d9a3d7e989 [file] [log] [blame]
#!/usr/bin/python
import logging, socket, threading, time
class DNSQuery:
TYPE = '\x00\x01' # An 'A' record
CLASS = '\x00\x01' # of the Internet class
TTL = '\x00\x00\x00\x3c' # 60 hops
RESOURCE_DATA_LENGTH = '\x00\x04' # 4 bytes
def __init__(self, data):
self.data=data
self.dominio=''
tipo = (ord(data[2]) >> 3) & 15 # Opcode bits
if tipo == 0: # Standard query
ini=12
lon=ord(data[ini])
while lon != 0:
self.dominio+=data[ini+1:ini+lon+1]+'.'
ini+=lon+1
lon=ord(data[ini])
def respuesta(self, ip):
packet=''
if self.dominio:
packet+=self.data[:2] + "\x81\x80"
# Query and Answers Counts
packet+=self.data[4:6] + self.data[4:6] + '\x00\x00\x00\x00'
# Original Domain Name Query
packet+=self.data[12:]
# Pointer to domain name
packet+='\xc0\x0c'
# Response type, ttl and resource data length -> 4 bytes
packet+=('%s%s%s%s' % (self.TYPE,
self.CLASS,
self.TTL,
self.RESOURCE_DATA_LENGTH))
# 4bytes of IP
packet+=str.join('',map(lambda x: chr(int(x)), ip.split('.')))
return packet
class DNSServer:
"""A very, very simple DNS responder.
DNSServer will bind to port |port| on |server_address| and respond
to any query with an A record pointing to |fake_ip|.
Must be run with superuser privilege to bind to port 53
"""
def __init__(self, server_address='', fake_ip='192.168.1.1', port=53):
self._udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._address = server_address
self._port = port
self._fake_ip = fake_ip
def run(self, stop):
"""Runs DNSServer until the threading.Event object |stop| gets set.
Example use:
server = DNSServer(fake_ip='127.0.0.1')
stopper = threading.Event()
thread = threading.Thread(target=server.run, args=(stopper,))
.
.
.
stopper.set()
"""
self._udps.bind((self._address, self._port))
logging.info('miniFakeDns listening on port %d' % self._port)
self._udps.settimeout(1)
while not stop.is_set():
try:
data, addr = self._udps.recvfrom(1500)
p=DNSQuery(data)
self._udps.sendto(p.respuesta(self._fake_ip), addr)
logging.debug('Response to %s: %s -> %s' % (addr,
p.dominio,
self._fake_ip))
except socket.timeout:
pass
self._udps.close()
if __name__ == '__main__':
ip='192.168.1.1'
print 'pyminifakeDNS:: dom.query. 60 IN A %s' % ip
server = DNSServer(fake_ip=ip)
stopper = threading.Event()
thread = threading.Thread(target=server.run, args=(stopper,))
try:
thread.start()
while 1:
time.sleep(1)
except KeyboardInterrupt:
print 'stopping thread'
stopper.set()
thread.join()