431 lines
16 KiB
Python
431 lines
16 KiB
Python
import logging
|
|
import uuid
|
|
import re
|
|
from StringIO import StringIO
|
|
import types
|
|
|
|
from java.lang import RuntimeException, System
|
|
from java.io import BufferedInputStream, BufferedReader, FileReader, InputStreamReader, ByteArrayInputStream, IOException
|
|
from java.security import KeyStore, Security, InvalidAlgorithmParameterException
|
|
from java.security.cert import CertificateException, CertificateFactory
|
|
from java.security.interfaces import RSAPrivateCrtKey
|
|
from java.security.interfaces import RSAPublicKey
|
|
from javax.net.ssl import X509KeyManager, X509TrustManager, KeyManagerFactory, SSLContext
|
|
|
|
try:
|
|
# jarjar-ed version
|
|
from org.python.netty.handler.ssl.util import SimpleTrustManagerFactory
|
|
|
|
except ImportError:
|
|
# dev version from extlibs
|
|
from io.netty.handler.ssl.util import SimpleTrustManagerFactory
|
|
|
|
try:
|
|
# dev version from extlibs OR if in classpath.
|
|
#
|
|
# Assumes BC's API is sufficiently stable, but this assumption
|
|
# seems safe based on our experience using BC.
|
|
#
|
|
# This change in import ordering - compared to similar conditional
|
|
# imports - is to workaround the problem in
|
|
# http://bugs.jython.org/issue2469, due to the fact that jarjar-ed
|
|
# jars - like other shading - lose their signatures. For most jars
|
|
# this is not an issue, and we have been removing signature files
|
|
# since 2.7.0. But in this specific case, removing signatures then
|
|
# causes conflicts with Java's security provider model, because it
|
|
# requires signing.
|
|
from org.bouncycastle.asn1.pkcs import PrivateKeyInfo
|
|
from org.bouncycastle.cert import X509CertificateHolder
|
|
from org.bouncycastle.cert.jcajce import JcaX509CertificateConverter
|
|
from org.bouncycastle.jce.provider import BouncyCastleProvider
|
|
from org.bouncycastle.jce import ECNamedCurveTable
|
|
from org.bouncycastle.jce.spec import ECNamedCurveSpec
|
|
from org.bouncycastle.openssl import PEMKeyPair, PEMParser, PEMEncryptedKeyPair, \
|
|
PEMException, EncryptionException
|
|
from org.bouncycastle.openssl.jcajce import JcaPEMKeyConverter, JcePEMDecryptorProviderBuilder
|
|
from org.bouncycastle.util.encoders import DecoderException
|
|
except ImportError:
|
|
# jarjar-ed version
|
|
from org.python.bouncycastle.asn1.pkcs import PrivateKeyInfo
|
|
from org.python.bouncycastle.cert import X509CertificateHolder
|
|
from org.python.bouncycastle.cert.jcajce import JcaX509CertificateConverter
|
|
from org.python.bouncycastle.jce.provider import BouncyCastleProvider
|
|
from org.python.bouncycastle.jce import ECNamedCurveTable
|
|
from org.python.bouncycastle.jce.spec import ECNamedCurveSpec
|
|
from org.python.bouncycastle.openssl import PEMKeyPair, PEMParser, PEMEncryptedKeyPair, \
|
|
PEMException, EncryptionException
|
|
from org.python.bouncycastle.openssl.jcajce import JcaPEMKeyConverter, JcePEMDecryptorProviderBuilder
|
|
from org.python.bouncycastle.util.encoders import DecoderException
|
|
|
|
log = logging.getLogger("_socket")
|
|
Security.addProvider(BouncyCastleProvider())
|
|
|
|
RE_BEGIN_KEY_CERT = re.compile(r'^-----BEGIN.*(PRIVATE KEY|CERTIFICATE)-----$')
|
|
|
|
|
|
def _get_ca_certs_trust_manager(ca_certs=None):
|
|
trust_store = KeyStore.getInstance(KeyStore.getDefaultType())
|
|
trust_store.load(None, None)
|
|
num_certs_installed = 0
|
|
if ca_certs is not None:
|
|
with open(ca_certs) as f:
|
|
cf = CertificateFactory.getInstance("X.509")
|
|
for cert in cf.generateCertificates(BufferedInputStream(f)):
|
|
trust_store.setCertificateEntry(str(uuid.uuid4()), cert)
|
|
num_certs_installed += 1
|
|
tmf = SimpleTrustManagerFactory.getInstance(SimpleTrustManagerFactory.getDefaultAlgorithm())
|
|
tmf.init(trust_store)
|
|
log.debug("Installed %s certificates", num_certs_installed, extra={"sock": "*"})
|
|
return tmf
|
|
|
|
|
|
def _stringio_as_reader(s):
|
|
return BufferedReader(InputStreamReader(ByteArrayInputStream(bytearray(s.getvalue()))))
|
|
|
|
|
|
def _extract_readers(f):
|
|
string_ios = []
|
|
output = StringIO()
|
|
key_cert_start_line_found = False
|
|
for line in f:
|
|
if RE_BEGIN_KEY_CERT.match(line):
|
|
key_cert_start_line_found = True
|
|
if output.getvalue():
|
|
string_ios.append(output)
|
|
|
|
output = StringIO()
|
|
output.write(line)
|
|
|
|
if output.getvalue():
|
|
string_ios.append(output)
|
|
|
|
if not key_cert_start_line_found:
|
|
from _socket import SSLError, SSL_ERROR_SSL
|
|
raise SSLError(SSL_ERROR_SSL, "PEM lib (no start line or not enough data)")
|
|
|
|
return [_stringio_as_reader(sio) for sio in string_ios]
|
|
|
|
|
|
def _get_openssl_key_manager(cert_file=None, key_file=None, password=None, _key_store=None):
|
|
certs, private_key = [], None
|
|
|
|
if _key_store is None:
|
|
_key_store = KeyStore.getInstance(KeyStore.getDefaultType())
|
|
_key_store.load(None, None)
|
|
|
|
if key_file is not None:
|
|
certs, private_key = _extract_certs_for_paths([key_file], password)
|
|
if private_key is None:
|
|
from _socket import SSLError, SSL_ERROR_SSL
|
|
raise SSLError(SSL_ERROR_SSL, "PEM lib (No private key loaded)")
|
|
|
|
if cert_file is not None:
|
|
_certs, _private_key = _extract_certs_for_paths([cert_file], password)
|
|
private_key = _private_key if _private_key else private_key
|
|
certs.extend(_certs)
|
|
|
|
if not private_key:
|
|
from _socket import SSLError, SSL_ERROR_SSL
|
|
raise SSLError(SSL_ERROR_SSL, "PEM lib (No private key loaded)")
|
|
|
|
keys_match, validateable_keys_found = False, False
|
|
for cert in certs:
|
|
# TODO works for RSA only for now
|
|
if isinstance(cert.publicKey, RSAPublicKey) and isinstance(private_key, RSAPrivateCrtKey):
|
|
validateable_keys_found = True
|
|
|
|
if validateable_keys_found:
|
|
if cert.publicKey.getModulus() == private_key.getModulus() \
|
|
and cert.publicKey.getPublicExponent() == private_key.getPublicExponent():
|
|
keys_match = True
|
|
break
|
|
|
|
if key_file is not None and validateable_keys_found and not keys_match:
|
|
from _socket import SSLError, SSL_ERROR_SSL
|
|
raise SSLError(SSL_ERROR_SSL, "key values mismatch")
|
|
|
|
_key_store.setKeyEntry(_str_hash_key_entry(private_key, *certs), private_key, [], certs)
|
|
|
|
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
|
kmf.init(_key_store, [])
|
|
return kmf
|
|
|
|
|
|
def _str_hash_key_entry(*args):
|
|
"""Very naiive"""
|
|
_hash = 0
|
|
for arg in args:
|
|
if arg:
|
|
_hash += hash(arg.toString().encode('utf8'))
|
|
|
|
return str(_hash)
|
|
|
|
|
|
def _parse_password(password):
|
|
is_password_func = False
|
|
if isinstance(password, (types.FunctionType, types.MethodType)) or hasattr(password, '__call__'):
|
|
password = password()
|
|
is_password_func = True
|
|
if password is None:
|
|
password = []
|
|
elif isinstance(password, (str, bytearray)):
|
|
password = str(password)
|
|
if len(password) >= 102400: # simulate openssl PEM_BUFSIZE limit error
|
|
raise ValueError("PEM lib (password cannot be longer than 102400 -1)")
|
|
else:
|
|
if is_password_func:
|
|
raise TypeError("PEM lib (must return a string")
|
|
else:
|
|
raise TypeError("PEM lib (password should be a string)")
|
|
return password
|
|
|
|
|
|
def _extract_certs_from_keystore_file(f, password):
|
|
keystore = KeyStore.getInstance(KeyStore.getDefaultType())
|
|
if password is None:
|
|
password = System.getProperty('javax.net.ssl.trustStorePassword')
|
|
if password is None: # default java keystore password is changeit
|
|
password = 'changeit'
|
|
elif not isinstance(password, str):
|
|
password = []
|
|
|
|
keystore.load(BufferedInputStream(f), password)
|
|
certs = []
|
|
|
|
alias_iter = keystore.aliases()
|
|
while alias_iter.hasMoreElements():
|
|
alias = alias_iter.nextElement()
|
|
certs.append(keystore.getCertificate(alias))
|
|
|
|
return certs
|
|
|
|
|
|
def _extract_certs_for_paths(paths, password=None):
|
|
# Go from Bouncy Castle API to Java's; a bit heavyweight for the Python dev ;)
|
|
key_converter = JcaPEMKeyConverter().setProvider("BC")
|
|
cert_converter = JcaX509CertificateConverter().setProvider("BC")
|
|
certs = []
|
|
private_key = None
|
|
for path in paths:
|
|
err = None
|
|
with open(path) as f:
|
|
# try to load the file as keystore file first
|
|
try:
|
|
_certs = _extract_certs_from_keystore_file(f, password)
|
|
certs.extend(_certs)
|
|
except IOException as err:
|
|
pass # reported as 'Invalid keystore format'
|
|
if err is not None: # try loading pem version instead
|
|
with open(path) as f:
|
|
_certs, _private_key = _extract_cert_from_data(f, password, key_converter, cert_converter)
|
|
private_key = _private_key if _private_key else private_key
|
|
certs.extend(_certs)
|
|
return certs, private_key
|
|
|
|
|
|
def _extract_cert_from_data(f, password=None, key_converter=None, cert_converter=None):
|
|
certs = []
|
|
private_key = None
|
|
|
|
if isinstance(f, unicode):
|
|
f = StringIO(str(f))
|
|
elif isinstance(f, str):
|
|
f = StringIO(f)
|
|
|
|
if not hasattr(f, 'seek'):
|
|
raise TypeError("PEM lib (data must be a file like object, string or bytes")
|
|
|
|
if _is_cert_pem(f):
|
|
certs, private_key = _read_pem_cert_from_data(f, password, key_converter, cert_converter)
|
|
else:
|
|
cf = CertificateFactory.getInstance("X.509")
|
|
certs = list(cf.generateCertificates(ByteArrayInputStream(f.read())))
|
|
|
|
return certs, private_key
|
|
|
|
|
|
def _read_pem_cert_from_data(f, password, key_converter, cert_converter):
|
|
|
|
def PEM_SSLError(err): # Shorthand
|
|
from _socket import SSLError, SSL_ERROR_SSL
|
|
return SSLError(SSL_ERROR_SSL, "PEM lib ({})".format(err))
|
|
|
|
certs = []
|
|
private_key = None
|
|
|
|
if key_converter is None:
|
|
key_converter = JcaPEMKeyConverter().setProvider("BC")
|
|
if cert_converter is None:
|
|
cert_converter = JcaX509CertificateConverter().setProvider("BC")
|
|
for br in _extract_readers(f):
|
|
while True:
|
|
try:
|
|
obj = PEMParser(br).readObject()
|
|
except PEMException as err:
|
|
raise PEM_SSLError(err)
|
|
except DecoderException as err:
|
|
raise PEM_SSLError(err)
|
|
|
|
if obj is None:
|
|
break
|
|
|
|
if isinstance(obj, PEMKeyPair):
|
|
private_key = key_converter.getKeyPair(obj).getPrivate()
|
|
elif isinstance(obj, PrivateKeyInfo):
|
|
private_key = key_converter.getPrivateKey(obj)
|
|
elif isinstance(obj, X509CertificateHolder):
|
|
certs.append(cert_converter.getCertificate(obj))
|
|
elif isinstance(obj, PEMEncryptedKeyPair):
|
|
provider = JcePEMDecryptorProviderBuilder().build(_parse_password(password))
|
|
try:
|
|
key_pair = key_converter.getKeyPair(obj.decryptKeyPair(provider))
|
|
except EncryptionException as err:
|
|
raise PEM_SSLError(err)
|
|
|
|
private_key = key_pair.getPrivate()
|
|
else:
|
|
raise NotImplementedError("Jython does not implement PEM object {!r}".format(obj))
|
|
return certs, private_key
|
|
|
|
|
|
def _is_cert_pem(f):
|
|
try:
|
|
# check if the data is string specifically to support DER certs
|
|
f.read().decode('ascii')
|
|
return True
|
|
except UnicodeDecodeError:
|
|
return False
|
|
finally:
|
|
f.seek(0)
|
|
|
|
assert "Should not reach here"
|
|
|
|
|
|
def _get_ecdh_parameter_spec(curve_name):
|
|
if not isinstance(curve_name, str):
|
|
raise TypeError("curve_name must be string/bytes")
|
|
|
|
spec_param = ECNamedCurveTable.getParameterSpec(curve_name)
|
|
if spec_param is None:
|
|
raise ValueError("unknown elliptic curve name {}".format(curve_name))
|
|
|
|
return ECNamedCurveSpec(spec_param.getName(), spec_param.getCurve(), spec_param.getG(), spec_param.getN(),
|
|
spec_param.getH(), spec_param.getSeed())
|
|
|
|
|
|
# CompositeX509KeyManager and CompositeX509TrustManager allow for mixing together Java built-in managers
|
|
# with new managers to support Python ssl.
|
|
#
|
|
# See http://tersesystems.com/2014/01/13/fixing-the-most-dangerous-code-in-the-world/
|
|
# for a good description of this composite approach.
|
|
#
|
|
# Ported to Python from http://codyaray.com/2013/04/java-ssl-with-multiple-keystores
|
|
# which was inspired by http://stackoverflow.com/questions/1793979/registering-multiple-keystores-in-jvm
|
|
|
|
class CompositeX509KeyManager(X509KeyManager):
|
|
|
|
def __init__(self, key_managers):
|
|
self.key_managers = key_managers
|
|
|
|
def chooseClientAlias(self, key_type, issuers, socket):
|
|
for key_manager in self.key_managers:
|
|
alias = key_manager.chooseClientAlias(key_type, issuers, socket)
|
|
if alias:
|
|
return alias;
|
|
return None
|
|
|
|
def chooseServerAlias(self, key_type, issuers, socket):
|
|
for key_manager in self.key_managers:
|
|
alias = key_manager.chooseServerAlias(key_type, issuers, socket)
|
|
if alias:
|
|
return alias;
|
|
return None
|
|
|
|
def getPrivateKey(self, alias):
|
|
for key_manager in self.key_managers:
|
|
private_key = key_manager.getPrivateKey(alias)
|
|
if private_key:
|
|
return private_key
|
|
return None
|
|
|
|
def getCertificateChain(self, alias):
|
|
for key_manager in self.key_managers:
|
|
chain = key_manager.getCertificateChain(alias)
|
|
if chain:
|
|
return chain
|
|
return None
|
|
|
|
def getClientAliases(self, key_type, issuers):
|
|
aliases = []
|
|
for key_manager in self.key_managers:
|
|
aliases.extend(key_manager.getClientAliases(key_type, issuers))
|
|
if not aliases:
|
|
return None
|
|
else:
|
|
return aliases
|
|
|
|
def getServerAliases(self, key_type, issuers):
|
|
aliases = []
|
|
for key_manager in self.key_managers:
|
|
aliases.extend(key_manager.getServerAliases(key_type, issuers))
|
|
if not aliases:
|
|
return None
|
|
else:
|
|
return aliases
|
|
|
|
|
|
class CompositeX509TrustManager(X509TrustManager):
|
|
|
|
def __init__(self, trust_managers):
|
|
self.trust_managers = trust_managers
|
|
|
|
def checkClientTrusted(self, chain, auth_type):
|
|
for trust_manager in self.trust_managers:
|
|
try:
|
|
trust_manager.checkClientTrusted(chain, auth_type)
|
|
return
|
|
except CertificateException:
|
|
continue
|
|
except RuntimeException as err:
|
|
# special-case to raise proper CertificateException ;)
|
|
if isinstance(err.getCause(), InvalidAlgorithmParameterException):
|
|
if err.getCause().getMessage() == u'the trustAnchors parameter must be non-empty':
|
|
continue
|
|
raise
|
|
|
|
raise CertificateException("certificate verify failed")
|
|
|
|
def checkServerTrusted(self, chain, auth_type):
|
|
for trust_manager in self.trust_managers:
|
|
try:
|
|
trust_manager.checkServerTrusted(chain, auth_type)
|
|
return
|
|
except CertificateException:
|
|
continue
|
|
except RuntimeException as err:
|
|
# special-case to raise proper CertificateException ;)
|
|
if isinstance(err.getCause(), InvalidAlgorithmParameterException):
|
|
if err.getCause().getMessage() == u'the trustAnchors parameter must be non-empty':
|
|
continue
|
|
raise
|
|
|
|
raise CertificateException("certificate verify failed")
|
|
|
|
def getAcceptedIssuers(self):
|
|
certs = []
|
|
for trust_manager in self.trust_managers:
|
|
certs.extend(trust_manager.getAcceptedIssuers())
|
|
return certs
|
|
|
|
|
|
class CompositeX509TrustManagerFactory(SimpleTrustManagerFactory):
|
|
|
|
def __init__(self, trust_managers):
|
|
self._trust_manager = CompositeX509TrustManager(trust_managers)
|
|
|
|
def engineInit(self, arg):
|
|
pass
|
|
|
|
def engineGetTrustManagers(self):
|
|
return [self._trust_manager]
|