blob: fa2c663f3b06bff1a2567e240e0eb5a110f2416b [file] [log] [blame]
#! python
import sys
import os
import os.path
import socket
import thread
import time
import httplib
import BaseHTTPServer
import SimpleHTTPServer
try:
from cryptoIDlib.api import *
cryptoIDlibLoaded = True
except:
cryptoIDlibLoaded = False
if __name__ != "__main__":
raise "This must be run as a command, not used as a module!"
#import tlslite
#from tlslite.constants import AlertDescription, Fault
#from tlslite.utils.jython_compat import formatExceptionTrace
#from tlslite.X509 import X509, X509CertChain
from tlslite.api import *
def parsePrivateKey(s):
try:
return parsePEMKey(s, private=True)
except Exception, e:
print e
return parseXMLKey(s, private=True)
def clientTest(address, dir):
#Split address into hostname/port tuple
address = address.split(":")
if len(address)==1:
address.append("4443")
address = ( address[0], int(address[1]) )
def connect():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
sock.settimeout(5)
sock.connect(address)
c = TLSConnection(sock)
return c
test = 0
badFault = False
print "Test 1 - good shared key"
connection = connect()
connection.handshakeClientSharedKey("shared", "key")
connection.close()
connection.sock.close()
print "Test 2 - shared key faults"
for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSharedKey("shared", "key")
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
print "Test 3 - good SRP"
connection = connect()
connection.handshakeClientSRP("test", "password")
connection.close()
print "Test 4 - SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSRP("test", "password")
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
print "Test 5 - good SRP: unknown_srp_username idiom"
def srpCallback():
return ("test", "password")
connection = connect()
connection.handshakeClientUnknown(srpCallback=srpCallback)
connection.close()
connection.sock.close()
print "Test 6 - good SRP: with X.509 certificate"
connection = connect()
connection.handshakeClientSRP("test", "password")
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 7 - X.509 with SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSRP("test", "password")
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 8 - good SRP: with cryptoID certificate chain"
connection = connect()
connection.handshakeClientSRP("test", "password")
assert(isinstance(connection.session.serverCertChain, CertChain))
if not (connection.session.serverCertChain.validate()):
print connection.session.serverCertChain.validate(listProblems=True)
connection.close()
connection.sock.close()
print "Test 9 - CryptoID with SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSRP("test", "password")
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
print "Test 10 - good X509"
connection = connect()
connection.handshakeClientCert()
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 10.a - good X509, SSLv3"
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeClientCert(settings=settings)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 11 - X.509 faults"
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert()
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 12 - good cryptoID"
connection = connect()
connection.handshakeClientCert()
assert(isinstance(connection.session.serverCertChain, CertChain))
assert(connection.session.serverCertChain.validate())
connection.close()
connection.sock.close()
print "Test 13 - cryptoID faults"
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert()
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
print "Test 14 - good mutual X509"
x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
s = open(os.path.join(dir, "clientX509Key.pem")).read()
x509Key = parsePEMKey(s, private=True)
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 14.a - good mutual X509, SSLv3"
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 15 - mutual X.509 faults"
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert(x509Chain, x509Key)
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 16 - good mutual cryptoID"
cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
connection = connect()
connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
assert(isinstance(connection.session.serverCertChain, CertChain))
assert(connection.session.serverCertChain.validate())
connection.close()
connection.sock.close()
print "Test 17 - mutual cryptoID faults"
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
print " Good Fault %s" % (Fault.faultNames[fault])
except TLSFaultError, e:
print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
badFault = True
connection.sock.close()
print "Test 18 - good SRP, prepare to resume..."
connection = connect()
connection.handshakeClientSRP("test", "password")
connection.close()
connection.sock.close()
session = connection.session
print "Test 19 - resumption"
connection = connect()
connection.handshakeClientSRP("test", "garbage", session=session)
#Don't close! -- see below
print "Test 20 - invalidated resumption"
connection.sock.close() #Close the socket without a close_notify!
connection = connect()
try:
connection.handshakeClientSRP("test", "garbage", session=session)
assert()
except TLSRemoteAlert, alert:
if alert.description != AlertDescription.bad_record_mac:
raise
connection.sock.close()
print "Test 21 - HTTPS test X.509"
address = address[0], address[1]+1
if hasattr(socket, "timeout"):
timeoutEx = socket.timeout
else:
timeoutEx = socket.error
while 1:
try:
time.sleep(2)
htmlBody = open(os.path.join(dir, "index.html")).read()
fingerprint = None
for y in range(2):
h = HTTPTLSConnection(\
address[0], address[1], x509Fingerprint=fingerprint)
for x in range(3):
h.request("GET", "/index.html")
r = h.getresponse()
assert(r.status == 200)
s = r.read()
assert(s == htmlBody)
fingerprint = h.tlsSession.serverCertChain.getFingerprint()
assert(fingerprint)
time.sleep(2)
break
except timeoutEx:
print "timeout, retrying..."
pass
if cryptoIDlibLoaded:
print "Test 21a - HTTPS test SRP+cryptoID"
address = address[0], address[1]+1
if hasattr(socket, "timeout"):
timeoutEx = socket.timeout
else:
timeoutEx = socket.error
while 1:
try:
time.sleep(2) #Time to generate key and cryptoID
htmlBody = open(os.path.join(dir, "index.html")).read()
fingerprint = None
protocol = None
for y in range(2):
h = HTTPTLSConnection(\
address[0], address[1],
username="test", password="password",
cryptoID=fingerprint, protocol=protocol)
for x in range(3):
h.request("GET", "/index.html")
r = h.getresponse()
assert(r.status == 200)
s = r.read()
assert(s == htmlBody)
fingerprint = h.tlsSession.serverCertChain.cryptoID
assert(fingerprint)
protocol = "urn:whatever"
time.sleep(2)
break
except timeoutEx:
print "timeout, retrying..."
pass
address = address[0], address[1]+1
implementations = []
if cryptlibpyLoaded:
implementations.append("cryptlib")
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
print "Test 22 - different ciphers"
for implementation in implementations:
for cipher in ["aes128", "aes256", "rc4"]:
print "Test 22:",
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeClientSharedKey("shared", "key", settings=settings)
print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
connection.write("hello")
h = connection.read(min=5, max=5)
assert(h == "hello")
connection.close()
connection.sock.close()
print "Test 23 - throughput test"
for implementation in implementations:
for cipher in ["aes128", "aes256", "3des", "rc4"]:
if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
continue
print "Test 23:",
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeClientSharedKey("shared", "key", settings=settings)
print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
startTime = time.clock()
connection.write("hello"*10000)
h = connection.read(min=50000, max=50000)
stopTime = time.clock()
print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
assert(h == "hello"*10000)
connection.close()
connection.sock.close()
print "Test 24 - Internet servers test"
try:
i = IMAP4_TLS("cyrus.andrew.cmu.edu")
i.login("anonymous", "anonymous@anonymous.net")
i.logout()
print "Test 24: IMAP4 good"
p = POP3_TLS("pop.gmail.com")
p.quit()
print "Test 24: POP3 good"
except socket.error, e:
print "Non-critical error: socket error trying to reach internet server: ", e
if not badFault:
print "Test succeeded"
else:
print "Test failed"
def serverTest(address, dir):
#Split address into hostname/port tuple
address = address.split(":")
if len(address)==1:
address.append("4443")
address = ( address[0], int(address[1]) )
#Connect to server
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind(address)
lsock.listen(5)
def connect():
return TLSConnection(lsock.accept()[0])
print "Test 1 - good shared key"
sharedKeyDB = SharedKeyDB()
sharedKeyDB["shared"] = "key"
sharedKeyDB["shared2"] = "key2"
connection = connect()
connection.handshakeServer(sharedKeyDB=sharedKeyDB)
connection.close()
connection.sock.close()
print "Test 2 - shared key faults"
for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(sharedKeyDB=sharedKeyDB)
assert()
except:
pass
connection.sock.close()
print "Test 3 - good SRP"
#verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
#verifierDB.open()
verifierDB = VerifierDB()
verifierDB.create()
entry = VerifierDB.makeVerifier("test", "password", 1536)
verifierDB["test"] = entry
connection = connect()
connection.handshakeServer(verifierDB=verifierDB)
connection.close()
connection.sock.close()
print "Test 4 - SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(verifierDB=verifierDB)
assert()
except:
pass
connection.sock.close()
print "Test 5 - good SRP: unknown_srp_username idiom"
connection = connect()
connection.handshakeServer(verifierDB=verifierDB)
connection.close()
connection.sock.close()
print "Test 6 - good SRP: with X.509 cert"
x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
s = open(os.path.join(dir, "serverX509Key.pem")).read()
x509Key = parsePEMKey(s, private=True)
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, \
certChain=x509Chain, privateKey=x509Key)
connection.close()
connection.sock.close()
print "Test 7 - X.509 with SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(verifierDB=verifierDB, \
certChain=x509Chain, privateKey=x509Key)
assert()
except:
pass
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 8 - good SRP: with cryptoID certs"
cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, \
certChain=cryptoIDChain, privateKey=cryptoIDKey)
connection.close()
connection.sock.close()
print "Test 9 - cryptoID with SRP faults"
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(verifierDB=verifierDB, \
certChain=cryptoIDChain, privateKey=cryptoIDKey)
assert()
except:
pass
connection.sock.close()
print "Test 10 - good X.509"
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
connection.close()
connection.sock.close()
print "Test 10.a - good X.509, SSL v3"
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
connection.close()
connection.sock.close()
print "Test 11 - X.509 faults"
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
assert()
except:
pass
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 12 - good cryptoID"
connection = connect()
connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
connection.close()
connection.sock.close()
print "Test 13 - cryptoID faults"
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
assert()
except:
pass
connection.sock.close()
print "Test 14 - good mutual X.509"
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 14a - good mutual X.509, SSLv3"
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
connection.sock.close()
print "Test 15 - mutual X.509 faults"
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
assert()
except:
pass
connection.sock.close()
if cryptoIDlibLoaded:
print "Test 16 - good mutual cryptoID"
connection = connect()
connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
assert(isinstance(connection.session.serverCertChain, CertChain))
assert(connection.session.serverCertChain.validate())
connection.close()
connection.sock.close()
print "Test 17 - mutual cryptoID faults"
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
assert()
except:
pass
connection.sock.close()
print "Test 18 - good SRP, prepare to resume"
sessionCache = SessionCache()
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
connection.close()
connection.sock.close()
print "Test 19 - resumption"
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
#Don't close! -- see next test
print "Test 20 - invalidated resumption"
try:
connection.read(min=1, max=1)
assert() #Client is going to close the socket without a close_notify
except TLSAbruptCloseError, e:
pass
connection = connect()
try:
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
except TLSLocalAlert, alert:
if alert.description != AlertDescription.bad_record_mac:
raise
connection.sock.close()
print "Test 21 - HTTPS test X.509"
#Close the current listening socket
lsock.close()
#Create and run an HTTP Server using TLSSocketServerMixIn
class MyHTTPServer(TLSSocketServerMixIn,
BaseHTTPServer.HTTPServer):
def handshake(self, tlsConnection):
tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
return True
cd = os.getcwd()
os.chdir(dir)
address = address[0], address[1]+1
httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
for x in range(6):
httpd.handle_request()
httpd.server_close()
cd = os.chdir(cd)
if cryptoIDlibLoaded:
print "Test 21a - HTTPS test SRP+cryptoID"
#Create and run an HTTP Server using TLSSocketServerMixIn
class MyHTTPServer(TLSSocketServerMixIn,
BaseHTTPServer.HTTPServer):
def handshake(self, tlsConnection):
tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
verifierDB=verifierDB)
return True
cd = os.getcwd()
os.chdir(dir)
address = address[0], address[1]+1
httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
for x in range(6):
httpd.handle_request()
httpd.server_close()
cd = os.chdir(cd)
#Re-connect the listening socket
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = address[0], address[1]+1
lsock.bind(address)
lsock.listen(5)
def connect():
return TLSConnection(lsock.accept()[0])
implementations = []
if cryptlibpyLoaded:
implementations.append("cryptlib")
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
print "Test 22 - different ciphers"
for implementation in ["python"] * len(implementations):
for cipher in ["aes128", "aes256", "rc4"]:
print "Test 22:",
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
print connection.getCipherName(), connection.getCipherImplementation()
h = connection.read(min=5, max=5)
assert(h == "hello")
connection.write(h)
connection.close()
connection.sock.close()
print "Test 23 - throughput test"
for implementation in implementations:
for cipher in ["aes128", "aes256", "3des", "rc4"]:
if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
continue
print "Test 23:",
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
print connection.getCipherName(), connection.getCipherImplementation()
h = connection.read(min=50000, max=50000)
assert(h == "hello"*10000)
connection.write(h)
connection.close()
connection.sock.close()
print "Test succeeded"
if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
print ""
print "Version: 0.3.8"
print ""
print "RNG: %s" % prngName
print ""
print "Modules:"
if cryptlibpyLoaded:
print " cryptlib_py : Loaded"
else:
print " cryptlib_py : Not Loaded"
if m2cryptoLoaded:
print " M2Crypto : Loaded"
else:
print " M2Crypto : Not Loaded"
if pycryptoLoaded:
print " pycrypto : Loaded"
else:
print " pycrypto : Not Loaded"
if gmpyLoaded:
print " GMPY : Loaded"
else:
print " GMPY : Not Loaded"
if cryptoIDlibLoaded:
print " cryptoIDlib : Loaded"
else:
print " cryptoIDlib : Not Loaded"
print ""
print "Commands:"
print ""
print " clientcert <server> [<chain> <key>]"
print " clientsharedkey <server> <user> <pass>"
print " clientsrp <server> <user> <pass>"
print " clienttest <server> <dir>"
print ""
print " serversrp <server> <verifierDB>"
print " servercert <server> <chain> <key> [req]"
print " serversrpcert <server> <verifierDB> <chain> <key>"
print " serversharedkey <server> <sharedkeyDB>"
print " servertest <server> <dir>"
sys.exit()
cmd = sys.argv[1].lower()
class Args:
def __init__(self, argv):
self.argv = argv
def get(self, index):
if len(self.argv)<=index:
raise SyntaxError("Not enough arguments")
return self.argv[index]
def getLast(self, index):
if len(self.argv)>index+1:
raise SyntaxError("Too many arguments")
return self.get(index)
args = Args(sys.argv)
def reformatDocString(s):
lines = s.splitlines()
newLines = []
for line in lines:
newLines.append(" " + line.strip())
return "\n".join(newLines)
try:
if cmd == "clienttest":
address = args.get(2)
dir = args.getLast(3)
clientTest(address, dir)
sys.exit()
elif cmd.startswith("client"):
address = args.get(2)
#Split address into hostname/port tuple
address = address.split(":")
if len(address)==1:
address.append("4443")
address = ( address[0], int(address[1]) )
def connect():
#Connect to server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if hasattr(sock, "settimeout"):
sock.settimeout(5)
sock.connect(address)
#Instantiate TLSConnections
return TLSConnection(sock)
try:
if cmd == "clientsrp":
username = args.get(3)
password = args.getLast(4)
connection = connect()
start = time.clock()
connection.handshakeClientSRP(username, password)
elif cmd == "clientsharedkey":
username = args.get(3)
password = args.getLast(4)
connection = connect()
start = time.clock()
connection.handshakeClientSharedKey(username, password)
elif cmd == "clientcert":
certChain = None
privateKey = None
if len(sys.argv) > 3:
certFilename = args.get(3)
keyFilename = args.getLast(4)
s1 = open(certFilename, "rb").read()
s2 = open(keyFilename, "rb").read()
#Try to create cryptoID cert chain
if cryptoIDlibLoaded:
try:
certChain = CertChain().parse(s1)
privateKey = parsePrivateKey(s2)
except:
certChain = None
privateKey = None
#Try to create X.509 cert chain
if not certChain:
x509 = X509()
x509.parse(s1)
certChain = X509CertChain([x509])
privateKey = parsePrivateKey(s2)
connection = connect()
start = time.clock()
connection.handshakeClientCert(certChain, privateKey)
else:
raise SyntaxError("Unknown command")
except TLSLocalAlert, a:
if a.description == AlertDescription.bad_record_mac:
if cmd == "clientsharedkey":
print "Bad sharedkey password"
else:
raise
elif a.description == AlertDescription.user_canceled:
print str(a)
else:
raise
sys.exit()
except TLSRemoteAlert, a:
if a.description == AlertDescription.unknown_srp_username:
if cmd == "clientsrp":
print "Unknown username"
else:
raise
elif a.description == AlertDescription.bad_record_mac:
if cmd == "clientsrp":
print "Bad username or password"
else:
raise
elif a.description == AlertDescription.handshake_failure:
print "Unable to negotiate mutually acceptable parameters"
else:
raise
sys.exit()
stop = time.clock()
print "Handshake success"
print " Handshake time: %.4f seconds" % (stop - start)
print " Version: %s.%s" % connection.version
print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
if connection.session.srpUsername:
print " Client SRP username: %s" % connection.session.srpUsername
if connection.session.sharedKeyUsername:
print " Client shared key username: %s" % connection.session.sharedKeyUsername
if connection.session.clientCertChain:
print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
if connection.session.serverCertChain:
print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
connection.close()
connection.sock.close()
elif cmd.startswith("server"):
address = args.get(2)
#Split address into hostname/port tuple
address = address.split(":")
if len(address)==1:
address.append("4443")
address = ( address[0], int(address[1]) )
verifierDBFilename = None
sharedKeyDBFilename = None
certFilename = None
keyFilename = None
sharedKeyDB = None
reqCert = False
if cmd == "serversrp":
verifierDBFilename = args.getLast(3)
elif cmd == "servercert":
certFilename = args.get(3)
keyFilename = args.get(4)
if len(sys.argv)>=6:
req = args.getLast(5)
if req.lower() != "req":
raise SyntaxError()
reqCert = True
elif cmd == "serversrpcert":
verifierDBFilename = args.get(3)
certFilename = args.get(4)
keyFilename = args.getLast(5)
elif cmd == "serversharedkey":
sharedKeyDBFilename = args.getLast(3)
elif cmd == "servertest":
address = args.get(2)
dir = args.getLast(3)
serverTest(address, dir)
sys.exit()
verifierDB = None
if verifierDBFilename:
verifierDB = VerifierDB(verifierDBFilename)
verifierDB.open()
sharedKeyDB = None
if sharedKeyDBFilename:
sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
sharedKeyDB.open()
certChain = None
privateKey = None
if certFilename:
s1 = open(certFilename, "rb").read()
s2 = open(keyFilename, "rb").read()
#Try to create cryptoID cert chain
if cryptoIDlibLoaded:
try:
certChain = CertChain().parse(s1)
privateKey = parsePrivateKey(s2)
except:
certChain = None
privateKey = None
#Try to create X.509 cert chain
if not certChain:
x509 = X509()
x509.parse(s1)
certChain = X509CertChain([x509])
privateKey = parsePrivateKey(s2)
#Create handler function - performs handshake, then echos all bytes received
def handler(sock):
try:
connection = TLSConnection(sock)
settings = HandshakeSettings()
connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
certChain=certChain, privateKey=privateKey, \
reqCert=reqCert, settings=settings)
print "Handshake success"
print " Version: %s.%s" % connection.version
print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
if connection.session.srpUsername:
print " Client SRP username: %s" % connection.session.srpUsername
if connection.session.sharedKeyUsername:
print " Client shared key username: %s" % connection.session.sharedKeyUsername
if connection.session.clientCertChain:
print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
if connection.session.serverCertChain:
print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
s = ""
while 1:
newS = connection.read()
if not newS:
break
s += newS
if s[-1]=='\n':
connection.write(s)
s = ""
except TLSLocalAlert, a:
if a.description == AlertDescription.unknown_srp_username:
print "Unknown SRP username"
elif a.description == AlertDescription.bad_record_mac:
if cmd == "serversrp" or cmd == "serversrpcert":
print "Bad SRP password for:", connection.allegedSrpUsername
else:
raise
elif a.description == AlertDescription.handshake_failure:
print "Unable to negotiate mutually acceptable parameters"
else:
raise
except TLSRemoteAlert, a:
if a.description == AlertDescription.bad_record_mac:
if cmd == "serversharedkey":
print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
else:
raise
elif a.description == AlertDescription.user_canceled:
print "Handshake cancelled"
elif a.description == AlertDescription.handshake_failure:
print "Unable to negotiate mutually acceptable parameters"
elif a.description == AlertDescription.close_notify:
pass
else:
raise
#Run multi-threaded server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.listen(5)
while 1:
(newsock, cliAddress) = sock.accept()
thread.start_new_thread(handler, (newsock,))
else:
print "Bad command: '%s'" % cmd
except TLSRemoteAlert, a:
print str(a)
raise