| #! python |
| |
| import sys |
| import os |
| import os.path |
| import socket |
| import thread |
| import time |
| import httplib |
| import BaseHTTPServer |
| import SimpleHTTPServer |
| |
| |
| try: |
| from cryptoIDlib.api import * |
| cryptoIDlibLoaded = True |
| except: |
| cryptoIDlibLoaded = False |
| |
| if __name__ != "__main__": |
| raise "This must be run as a command, not used as a module!" |
| |
| #import tlslite |
| #from tlslite.constants import AlertDescription, Fault |
| |
| #from tlslite.utils.jython_compat import formatExceptionTrace |
| #from tlslite.X509 import X509, X509CertChain |
| |
| from tlslite.api import * |
| |
| def parsePrivateKey(s): |
| try: |
| return parsePEMKey(s, private=True) |
| except Exception, e: |
| print e |
| return parseXMLKey(s, private=True) |
| |
| |
| def clientTest(address, dir): |
| |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| if len(address)==1: |
| address.append("4443") |
| address = ( address[0], int(address[1]) ) |
| |
| def connect(): |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| if hasattr(sock, 'settimeout'): #It's a python 2.3 feature |
| sock.settimeout(5) |
| sock.connect(address) |
| c = TLSConnection(sock) |
| return c |
| |
| test = 0 |
| |
| badFault = False |
| |
| print "Test 1 - good shared key" |
| connection = connect() |
| connection.handshakeClientSharedKey("shared", "key") |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 2 - shared key faults" |
| for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSharedKey("shared", "key") |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| print "Test 3 - good SRP" |
| connection = connect() |
| connection.handshakeClientSRP("test", "password") |
| connection.close() |
| |
| print "Test 4 - SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSRP("test", "password") |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| print "Test 5 - good SRP: unknown_srp_username idiom" |
| def srpCallback(): |
| return ("test", "password") |
| connection = connect() |
| connection.handshakeClientUnknown(srpCallback=srpCallback) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 6 - good SRP: with X.509 certificate" |
| connection = connect() |
| connection.handshakeClientSRP("test", "password") |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 7 - X.509 with SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSRP("test", "password") |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 8 - good SRP: with cryptoID certificate chain" |
| connection = connect() |
| connection.handshakeClientSRP("test", "password") |
| assert(isinstance(connection.session.serverCertChain, CertChain)) |
| if not (connection.session.serverCertChain.validate()): |
| print connection.session.serverCertChain.validate(listProblems=True) |
| |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 9 - CryptoID with SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSRP("test", "password") |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| print "Test 10 - good X509" |
| connection = connect() |
| connection.handshakeClientCert() |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 10.a - good X509, SSLv3" |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeClientCert(settings=settings) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 11 - X.509 faults" |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert() |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 12 - good cryptoID" |
| connection = connect() |
| connection.handshakeClientCert() |
| assert(isinstance(connection.session.serverCertChain, CertChain)) |
| assert(connection.session.serverCertChain.validate()) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 13 - cryptoID faults" |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert() |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| print "Test 14 - good mutual X509" |
| x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) |
| x509Chain = X509CertChain([x509Cert]) |
| s = open(os.path.join(dir, "clientX509Key.pem")).read() |
| x509Key = parsePEMKey(s, private=True) |
| |
| connection = connect() |
| connection.handshakeClientCert(x509Chain, x509Key) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 14.a - good mutual X509, SSLv3" |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeClientCert(x509Chain, x509Key, settings=settings) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 15 - mutual X.509 faults" |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert(x509Chain, x509Key) |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 16 - good mutual cryptoID" |
| cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) |
| cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) |
| |
| connection = connect() |
| connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) |
| assert(isinstance(connection.session.serverCertChain, CertChain)) |
| assert(connection.session.serverCertChain.validate()) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 17 - mutual cryptoID faults" |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) |
| print " Good Fault %s" % (Fault.faultNames[fault]) |
| except TLSFaultError, e: |
| print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
| badFault = True |
| connection.sock.close() |
| |
| print "Test 18 - good SRP, prepare to resume..." |
| connection = connect() |
| connection.handshakeClientSRP("test", "password") |
| connection.close() |
| connection.sock.close() |
| session = connection.session |
| |
| print "Test 19 - resumption" |
| connection = connect() |
| connection.handshakeClientSRP("test", "garbage", session=session) |
| #Don't close! -- see below |
| |
| print "Test 20 - invalidated resumption" |
| connection.sock.close() #Close the socket without a close_notify! |
| connection = connect() |
| try: |
| connection.handshakeClientSRP("test", "garbage", session=session) |
| assert() |
| except TLSRemoteAlert, alert: |
| if alert.description != AlertDescription.bad_record_mac: |
| raise |
| connection.sock.close() |
| |
| print "Test 21 - HTTPS test X.509" |
| address = address[0], address[1]+1 |
| if hasattr(socket, "timeout"): |
| timeoutEx = socket.timeout |
| else: |
| timeoutEx = socket.error |
| while 1: |
| try: |
| time.sleep(2) |
| htmlBody = open(os.path.join(dir, "index.html")).read() |
| fingerprint = None |
| for y in range(2): |
| h = HTTPTLSConnection(\ |
| address[0], address[1], x509Fingerprint=fingerprint) |
| for x in range(3): |
| h.request("GET", "/index.html") |
| r = h.getresponse() |
| assert(r.status == 200) |
| s = r.read() |
| assert(s == htmlBody) |
| fingerprint = h.tlsSession.serverCertChain.getFingerprint() |
| assert(fingerprint) |
| time.sleep(2) |
| break |
| except timeoutEx: |
| print "timeout, retrying..." |
| pass |
| |
| if cryptoIDlibLoaded: |
| print "Test 21a - HTTPS test SRP+cryptoID" |
| address = address[0], address[1]+1 |
| if hasattr(socket, "timeout"): |
| timeoutEx = socket.timeout |
| else: |
| timeoutEx = socket.error |
| while 1: |
| try: |
| time.sleep(2) #Time to generate key and cryptoID |
| htmlBody = open(os.path.join(dir, "index.html")).read() |
| fingerprint = None |
| protocol = None |
| for y in range(2): |
| h = HTTPTLSConnection(\ |
| address[0], address[1], |
| username="test", password="password", |
| cryptoID=fingerprint, protocol=protocol) |
| for x in range(3): |
| h.request("GET", "/index.html") |
| r = h.getresponse() |
| assert(r.status == 200) |
| s = r.read() |
| assert(s == htmlBody) |
| fingerprint = h.tlsSession.serverCertChain.cryptoID |
| assert(fingerprint) |
| protocol = "urn:whatever" |
| time.sleep(2) |
| break |
| except timeoutEx: |
| print "timeout, retrying..." |
| pass |
| |
| address = address[0], address[1]+1 |
| |
| implementations = [] |
| if cryptlibpyLoaded: |
| implementations.append("cryptlib") |
| if m2cryptoLoaded: |
| implementations.append("openssl") |
| if pycryptoLoaded: |
| implementations.append("pycrypto") |
| implementations.append("python") |
| |
| print "Test 22 - different ciphers" |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "rc4"]: |
| |
| print "Test 22:", |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| connection.handshakeClientSharedKey("shared", "key", settings=settings) |
| print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) |
| |
| connection.write("hello") |
| h = connection.read(min=5, max=5) |
| assert(h == "hello") |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 23 - throughput test" |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): |
| continue |
| |
| print "Test 23:", |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| connection.handshakeClientSharedKey("shared", "key", settings=settings) |
| print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())), |
| |
| startTime = time.clock() |
| connection.write("hello"*10000) |
| h = connection.read(min=50000, max=50000) |
| stopTime = time.clock() |
| print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)) |
| |
| assert(h == "hello"*10000) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 24 - Internet servers test" |
| try: |
| i = IMAP4_TLS("cyrus.andrew.cmu.edu") |
| i.login("anonymous", "anonymous@anonymous.net") |
| i.logout() |
| print "Test 24: IMAP4 good" |
| p = POP3_TLS("pop.gmail.com") |
| p.quit() |
| print "Test 24: POP3 good" |
| except socket.error, e: |
| print "Non-critical error: socket error trying to reach internet server: ", e |
| |
| if not badFault: |
| print "Test succeeded" |
| else: |
| print "Test failed" |
| |
| |
| def serverTest(address, dir): |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| if len(address)==1: |
| address.append("4443") |
| address = ( address[0], int(address[1]) ) |
| |
| #Connect to server |
| lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| lsock.bind(address) |
| lsock.listen(5) |
| |
| def connect(): |
| return TLSConnection(lsock.accept()[0]) |
| |
| print "Test 1 - good shared key" |
| sharedKeyDB = SharedKeyDB() |
| sharedKeyDB["shared"] = "key" |
| sharedKeyDB["shared2"] = "key2" |
| connection = connect() |
| connection.handshakeServer(sharedKeyDB=sharedKeyDB) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 2 - shared key faults" |
| for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(sharedKeyDB=sharedKeyDB) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| print "Test 3 - good SRP" |
| #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB")) |
| #verifierDB.open() |
| verifierDB = VerifierDB() |
| verifierDB.create() |
| entry = VerifierDB.makeVerifier("test", "password", 1536) |
| verifierDB["test"] = entry |
| |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 4 - SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(verifierDB=verifierDB) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| print "Test 5 - good SRP: unknown_srp_username idiom" |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 6 - good SRP: with X.509 cert" |
| x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) |
| x509Chain = X509CertChain([x509Cert]) |
| s = open(os.path.join(dir, "serverX509Key.pem")).read() |
| x509Key = parsePEMKey(s, private=True) |
| |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=x509Chain, privateKey=x509Key) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 7 - X.509 with SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=x509Chain, privateKey=x509Key) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 8 - good SRP: with cryptoID certs" |
| cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) |
| cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=cryptoIDChain, privateKey=cryptoIDKey) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 9 - cryptoID with SRP faults" |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=cryptoIDChain, privateKey=cryptoIDKey) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| print "Test 10 - good X.509" |
| connection = connect() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 10.a - good X.509, SSL v3" |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 11 - X.509 faults" |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 12 - good cryptoID" |
| connection = connect() |
| connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 13 - cryptoID faults" |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| print "Test 14 - good mutual X.509" |
| connection = connect() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 14a - good mutual X.509, SSLv3" |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 15 - mutual X.509 faults" |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| if cryptoIDlibLoaded: |
| print "Test 16 - good mutual cryptoID" |
| connection = connect() |
| connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) |
| assert(isinstance(connection.session.serverCertChain, CertChain)) |
| assert(connection.session.serverCertChain.validate()) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 17 - mutual cryptoID faults" |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) |
| assert() |
| except: |
| pass |
| connection.sock.close() |
| |
| print "Test 18 - good SRP, prepare to resume" |
| sessionCache = SessionCache() |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 19 - resumption" |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| #Don't close! -- see next test |
| |
| print "Test 20 - invalidated resumption" |
| try: |
| connection.read(min=1, max=1) |
| assert() #Client is going to close the socket without a close_notify |
| except TLSAbruptCloseError, e: |
| pass |
| connection = connect() |
| try: |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| except TLSLocalAlert, alert: |
| if alert.description != AlertDescription.bad_record_mac: |
| raise |
| connection.sock.close() |
| |
| print "Test 21 - HTTPS test X.509" |
| |
| #Close the current listening socket |
| lsock.close() |
| |
| #Create and run an HTTP Server using TLSSocketServerMixIn |
| class MyHTTPServer(TLSSocketServerMixIn, |
| BaseHTTPServer.HTTPServer): |
| def handshake(self, tlsConnection): |
| tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| return True |
| cd = os.getcwd() |
| os.chdir(dir) |
| address = address[0], address[1]+1 |
| httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) |
| for x in range(6): |
| httpd.handle_request() |
| httpd.server_close() |
| cd = os.chdir(cd) |
| |
| if cryptoIDlibLoaded: |
| print "Test 21a - HTTPS test SRP+cryptoID" |
| |
| #Create and run an HTTP Server using TLSSocketServerMixIn |
| class MyHTTPServer(TLSSocketServerMixIn, |
| BaseHTTPServer.HTTPServer): |
| def handshake(self, tlsConnection): |
| tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, |
| verifierDB=verifierDB) |
| return True |
| cd = os.getcwd() |
| os.chdir(dir) |
| address = address[0], address[1]+1 |
| httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) |
| for x in range(6): |
| httpd.handle_request() |
| httpd.server_close() |
| cd = os.chdir(cd) |
| |
| #Re-connect the listening socket |
| lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| address = address[0], address[1]+1 |
| lsock.bind(address) |
| lsock.listen(5) |
| |
| def connect(): |
| return TLSConnection(lsock.accept()[0]) |
| |
| implementations = [] |
| if cryptlibpyLoaded: |
| implementations.append("cryptlib") |
| if m2cryptoLoaded: |
| implementations.append("openssl") |
| if pycryptoLoaded: |
| implementations.append("pycrypto") |
| implementations.append("python") |
| |
| print "Test 22 - different ciphers" |
| for implementation in ["python"] * len(implementations): |
| for cipher in ["aes128", "aes256", "rc4"]: |
| |
| print "Test 22:", |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| |
| connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) |
| print connection.getCipherName(), connection.getCipherImplementation() |
| h = connection.read(min=5, max=5) |
| assert(h == "hello") |
| connection.write(h) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test 23 - throughput test" |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): |
| continue |
| |
| print "Test 23:", |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| |
| connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) |
| print connection.getCipherName(), connection.getCipherImplementation() |
| h = connection.read(min=50000, max=50000) |
| assert(h == "hello"*10000) |
| connection.write(h) |
| connection.close() |
| connection.sock.close() |
| |
| print "Test succeeded" |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")): |
| print "" |
| print "Version: 0.3.8" |
| print "" |
| print "RNG: %s" % prngName |
| print "" |
| print "Modules:" |
| if cryptlibpyLoaded: |
| print " cryptlib_py : Loaded" |
| else: |
| print " cryptlib_py : Not Loaded" |
| if m2cryptoLoaded: |
| print " M2Crypto : Loaded" |
| else: |
| print " M2Crypto : Not Loaded" |
| if pycryptoLoaded: |
| print " pycrypto : Loaded" |
| else: |
| print " pycrypto : Not Loaded" |
| if gmpyLoaded: |
| print " GMPY : Loaded" |
| else: |
| print " GMPY : Not Loaded" |
| if cryptoIDlibLoaded: |
| print " cryptoIDlib : Loaded" |
| else: |
| print " cryptoIDlib : Not Loaded" |
| print "" |
| print "Commands:" |
| print "" |
| print " clientcert <server> [<chain> <key>]" |
| print " clientsharedkey <server> <user> <pass>" |
| print " clientsrp <server> <user> <pass>" |
| print " clienttest <server> <dir>" |
| print "" |
| print " serversrp <server> <verifierDB>" |
| print " servercert <server> <chain> <key> [req]" |
| print " serversrpcert <server> <verifierDB> <chain> <key>" |
| print " serversharedkey <server> <sharedkeyDB>" |
| print " servertest <server> <dir>" |
| sys.exit() |
| |
| cmd = sys.argv[1].lower() |
| |
| class Args: |
| def __init__(self, argv): |
| self.argv = argv |
| def get(self, index): |
| if len(self.argv)<=index: |
| raise SyntaxError("Not enough arguments") |
| return self.argv[index] |
| def getLast(self, index): |
| if len(self.argv)>index+1: |
| raise SyntaxError("Too many arguments") |
| return self.get(index) |
| |
| args = Args(sys.argv) |
| |
| def reformatDocString(s): |
| lines = s.splitlines() |
| newLines = [] |
| for line in lines: |
| newLines.append(" " + line.strip()) |
| return "\n".join(newLines) |
| |
| try: |
| if cmd == "clienttest": |
| address = args.get(2) |
| dir = args.getLast(3) |
| clientTest(address, dir) |
| sys.exit() |
| |
| elif cmd.startswith("client"): |
| address = args.get(2) |
| |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| if len(address)==1: |
| address.append("4443") |
| address = ( address[0], int(address[1]) ) |
| |
| def connect(): |
| #Connect to server |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| if hasattr(sock, "settimeout"): |
| sock.settimeout(5) |
| sock.connect(address) |
| |
| #Instantiate TLSConnections |
| return TLSConnection(sock) |
| |
| try: |
| if cmd == "clientsrp": |
| username = args.get(3) |
| password = args.getLast(4) |
| connection = connect() |
| start = time.clock() |
| connection.handshakeClientSRP(username, password) |
| elif cmd == "clientsharedkey": |
| username = args.get(3) |
| password = args.getLast(4) |
| connection = connect() |
| start = time.clock() |
| connection.handshakeClientSharedKey(username, password) |
| elif cmd == "clientcert": |
| certChain = None |
| privateKey = None |
| if len(sys.argv) > 3: |
| certFilename = args.get(3) |
| keyFilename = args.getLast(4) |
| |
| s1 = open(certFilename, "rb").read() |
| s2 = open(keyFilename, "rb").read() |
| |
| #Try to create cryptoID cert chain |
| if cryptoIDlibLoaded: |
| try: |
| certChain = CertChain().parse(s1) |
| privateKey = parsePrivateKey(s2) |
| except: |
| certChain = None |
| privateKey = None |
| |
| #Try to create X.509 cert chain |
| if not certChain: |
| x509 = X509() |
| x509.parse(s1) |
| certChain = X509CertChain([x509]) |
| privateKey = parsePrivateKey(s2) |
| |
| connection = connect() |
| start = time.clock() |
| connection.handshakeClientCert(certChain, privateKey) |
| else: |
| raise SyntaxError("Unknown command") |
| |
| except TLSLocalAlert, a: |
| if a.description == AlertDescription.bad_record_mac: |
| if cmd == "clientsharedkey": |
| print "Bad sharedkey password" |
| else: |
| raise |
| elif a.description == AlertDescription.user_canceled: |
| print str(a) |
| else: |
| raise |
| sys.exit() |
| except TLSRemoteAlert, a: |
| if a.description == AlertDescription.unknown_srp_username: |
| if cmd == "clientsrp": |
| print "Unknown username" |
| else: |
| raise |
| elif a.description == AlertDescription.bad_record_mac: |
| if cmd == "clientsrp": |
| print "Bad username or password" |
| else: |
| raise |
| elif a.description == AlertDescription.handshake_failure: |
| print "Unable to negotiate mutually acceptable parameters" |
| else: |
| raise |
| sys.exit() |
| |
| stop = time.clock() |
| print "Handshake success" |
| print " Handshake time: %.4f seconds" % (stop - start) |
| print " Version: %s.%s" % connection.version |
| print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) |
| if connection.session.srpUsername: |
| print " Client SRP username: %s" % connection.session.srpUsername |
| if connection.session.sharedKeyUsername: |
| print " Client shared key username: %s" % connection.session.sharedKeyUsername |
| if connection.session.clientCertChain: |
| print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() |
| if connection.session.serverCertChain: |
| print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() |
| connection.close() |
| connection.sock.close() |
| |
| elif cmd.startswith("server"): |
| address = args.get(2) |
| |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| if len(address)==1: |
| address.append("4443") |
| address = ( address[0], int(address[1]) ) |
| |
| verifierDBFilename = None |
| sharedKeyDBFilename = None |
| certFilename = None |
| keyFilename = None |
| sharedKeyDB = None |
| reqCert = False |
| |
| if cmd == "serversrp": |
| verifierDBFilename = args.getLast(3) |
| elif cmd == "servercert": |
| certFilename = args.get(3) |
| keyFilename = args.get(4) |
| if len(sys.argv)>=6: |
| req = args.getLast(5) |
| if req.lower() != "req": |
| raise SyntaxError() |
| reqCert = True |
| elif cmd == "serversrpcert": |
| verifierDBFilename = args.get(3) |
| certFilename = args.get(4) |
| keyFilename = args.getLast(5) |
| elif cmd == "serversharedkey": |
| sharedKeyDBFilename = args.getLast(3) |
| elif cmd == "servertest": |
| address = args.get(2) |
| dir = args.getLast(3) |
| serverTest(address, dir) |
| sys.exit() |
| |
| verifierDB = None |
| if verifierDBFilename: |
| verifierDB = VerifierDB(verifierDBFilename) |
| verifierDB.open() |
| |
| sharedKeyDB = None |
| if sharedKeyDBFilename: |
| sharedKeyDB = SharedKeyDB(sharedKeyDBFilename) |
| sharedKeyDB.open() |
| |
| certChain = None |
| privateKey = None |
| if certFilename: |
| s1 = open(certFilename, "rb").read() |
| s2 = open(keyFilename, "rb").read() |
| |
| #Try to create cryptoID cert chain |
| if cryptoIDlibLoaded: |
| try: |
| certChain = CertChain().parse(s1) |
| privateKey = parsePrivateKey(s2) |
| except: |
| certChain = None |
| privateKey = None |
| |
| #Try to create X.509 cert chain |
| if not certChain: |
| x509 = X509() |
| x509.parse(s1) |
| certChain = X509CertChain([x509]) |
| privateKey = parsePrivateKey(s2) |
| |
| |
| |
| #Create handler function - performs handshake, then echos all bytes received |
| def handler(sock): |
| try: |
| connection = TLSConnection(sock) |
| settings = HandshakeSettings() |
| connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \ |
| certChain=certChain, privateKey=privateKey, \ |
| reqCert=reqCert, settings=settings) |
| print "Handshake success" |
| print " Version: %s.%s" % connection.version |
| print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) |
| if connection.session.srpUsername: |
| print " Client SRP username: %s" % connection.session.srpUsername |
| if connection.session.sharedKeyUsername: |
| print " Client shared key username: %s" % connection.session.sharedKeyUsername |
| if connection.session.clientCertChain: |
| print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() |
| if connection.session.serverCertChain: |
| print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() |
| |
| s = "" |
| while 1: |
| newS = connection.read() |
| if not newS: |
| break |
| s += newS |
| if s[-1]=='\n': |
| connection.write(s) |
| s = "" |
| except TLSLocalAlert, a: |
| if a.description == AlertDescription.unknown_srp_username: |
| print "Unknown SRP username" |
| elif a.description == AlertDescription.bad_record_mac: |
| if cmd == "serversrp" or cmd == "serversrpcert": |
| print "Bad SRP password for:", connection.allegedSrpUsername |
| else: |
| raise |
| elif a.description == AlertDescription.handshake_failure: |
| print "Unable to negotiate mutually acceptable parameters" |
| else: |
| raise |
| except TLSRemoteAlert, a: |
| if a.description == AlertDescription.bad_record_mac: |
| if cmd == "serversharedkey": |
| print "Bad sharedkey password for:", connection.allegedSharedKeyUsername |
| else: |
| raise |
| elif a.description == AlertDescription.user_canceled: |
| print "Handshake cancelled" |
| elif a.description == AlertDescription.handshake_failure: |
| print "Unable to negotiate mutually acceptable parameters" |
| elif a.description == AlertDescription.close_notify: |
| pass |
| else: |
| raise |
| |
| #Run multi-threaded server |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.bind(address) |
| sock.listen(5) |
| while 1: |
| (newsock, cliAddress) = sock.accept() |
| thread.start_new_thread(handler, (newsock,)) |
| |
| |
| else: |
| print "Bad command: '%s'" % cmd |
| except TLSRemoteAlert, a: |
| print str(a) |
| raise |
| |
| |
| |
| |
| |