Incompatibile interface changes: DigestType methods size, block_size and name become properties
"""
-from ctypes import CDLL,c_char_p
+from ctypes import CDLL, c_char_p
def config(filename=None):
"""
__all__ = ['config']
libcrypto = CDLL("libcrypto.so.1.0.0")
-libcrypto.OPENSSL_config.argtypes=(c_char_p,)
+libcrypto.OPENSSL_config.argtypes = (c_char_p, )
libcrypto.OPENSSL_add_all_algorithms_conf()
Interface to OpenSSL BIO library
"""
from ctypescrypto import libcrypto
-from ctypes import c_char_p, c_void_p, c_int, string_at, c_long,POINTER,byref, create_string_buffer
+from ctypes import c_char_p, c_void_p, c_int, string_at, c_long
+from ctypes import POINTER, byref, create_string_buffer
class Membio(object):
- """
- Provides interface to OpenSSL memory bios
- use str() or unicode() to get contents of writable bio
- use bio member to pass to libcrypto function
"""
- def __init__(self,data=None):
- """ If data is specified, creates read-only BIO. If data is
- None, creates writable BIO, contents of which can be retrieved by str() or unicode()
+ Provides interface to OpenSSL memory bios
+ use str() or unicode() to get contents of writable bio
+ use bio member to pass to libcrypto function
+ """
+ def __init__(self, data=None):
+ """
+ If data is specified, creates read-only BIO. If data is
+ None, creates writable BIO, contents of which can be retrieved
+ by str() or unicode()
"""
if data is None:
- method=libcrypto.BIO_s_mem()
- self.bio=libcrypto.BIO_new(method)
+ method = libcrypto.BIO_s_mem()
+ self.bio = libcrypto.BIO_new(method)
else:
- self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data))
+ self.bio = libcrypto.BIO_new_mem_buf(c_char_p(data), len(data))
+
def __del__(self):
"""
Cleans up memory used by bio
"""
libcrypto.BIO_free(self.bio)
- del(self.bio)
+ del self.bio
+
def __str__(self):
"""
Returns current contents of buffer as byte string
"""
- p=c_char_p(None)
- l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p))
- return string_at(p,l)
+ string_ptr = c_char_p(None)
+ string_len = libcrypto.BIO_ctrl(self.bio, 3, 0, byref(string_ptr))
+ return string_at(string_ptr, string_len)
+
def __unicode__(self):
"""
- Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode
+ Attempts to interpret current contents of buffer as UTF-8 string
+ and convert it to unicode
"""
return str(self).decode("utf-8")
- def read(self,length=None):
+
+ def read(self, length=None):
"""
Reads data from readble BIO. For test purposes.
- @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer
+ @param length - if specifed, limits amount of data read.
+ If not BIO is read until end of buffer
"""
if not length is None:
- if not isinstance(length,(int,long)):
+ if not isinstance(length, (int, long)):
raise TypeError("length to read should be number")
- buf=create_string_buffer(length)
- readbytes=libcrypto.BIO_read(self.bio,buf,length)
- if readbytes==-2:
- raise NotImplementedError("Function is not supported by this BIO")
- if readbytes==-1:
+ buf = create_string_buffer(length)
+ readbytes = libcrypto.BIO_read(self.bio, buf, length)
+ if readbytes == -2:
+ raise NotImplementedError("Function is not supported by" +
+ "this BIO")
+ if readbytes == -1:
raise IOError
- if readbytes==0:
+ if readbytes == 0:
return ""
return buf.raw[:readbytes]
else:
- buf=create_string_buffer(1024)
- out=""
- r=1
- while r>0:
- r=libcrypto.BIO_read(self.bio,buf,1024)
- if r==-2:
- raise NotImplementedError("Function is not supported by this BIO")
- if r==-1:
+ buf = create_string_buffer(1024)
+ out = ""
+ readbytes = 1
+ while readbytes > 0:
+ readbytes = libcrypto.BIO_read(self.bio, buf, 1024)
+ if readbytes == -2:
+ raise NotImplementedError("Function is not supported by " +
+ "this BIO")
+ if readbytes == -1:
raise IOError
- if (r>0):
- out+=buf.raw[:r]
- return out
+ if readbytes > 0:
+ out += buf.raw[:readbytes]
+ return out
- def write(self,data):
+ def write(self, data):
"""
Writes data to writable bio. For test purposes
"""
- if isinstance(data,unicode):
- data=data.encode("utf-8")
- r=libcrypto.BIO_write(self.bio,data,len(data))
- if r==-2:
+ if isinstance(data, unicode):
+ data = data.encode("utf-8")
+ else:
+ data = str(data)
+
+ written = libcrypto.BIO_write(self.bio, data, len(data))
+ if written == -2:
raise NotImplementedError("Function not supported by this BIO")
- if r<len(data):
+ if written < len(data):
raise IOError("Not all data were successfully written")
+
def reset(self):
"""
- Resets the read-only bio to start and discards all data from writable bio
+ Resets the read-only bio to start and discards all data from
+ writable bio
"""
- libcrypto.BIO_ctrl(self.bio,1,0,None)
+ libcrypto.BIO_ctrl(self.bio, 1, 0, None)
__all__ = ['Membio']
-libcrypto.BIO_s_mem.restype=c_void_p
-libcrypto.BIO_new.restype=c_void_p
-libcrypto.BIO_new.argtypes=(c_void_p,)
-libcrypto.BIO_ctrl.restype=c_long
-libcrypto.BIO_ctrl.argtypes=(c_void_p,c_int,c_long,POINTER(c_char_p))
-libcrypto.BIO_read.argtypes=(c_void_p,c_char_p,c_int)
-libcrypto.BIO_write.argtypes=(c_void_p,c_char_p,c_int)
-libcrypto.BIO_free.argtypes=(c_void_p,)
-libcrypto.BIO_new_mem_buf.restype=c_void_p
-libcrypto.BIO_new_mem_buf.argtypes=(c_char_p,c_int)
-libcrypto.BIO_ctrl.argtypes=(c_void_p,c_int,c_int,c_void_p)
+libcrypto.BIO_s_mem.restype = c_void_p
+libcrypto.BIO_new.restype = c_void_p
+libcrypto.BIO_new.argtypes = (c_void_p, )
+libcrypto.BIO_ctrl.restype = c_long
+libcrypto.BIO_ctrl.argtypes = (c_void_p, c_int, c_long, POINTER(c_char_p))
+libcrypto.BIO_read.argtypes = (c_void_p, c_char_p, c_int)
+libcrypto.BIO_write.argtypes = (c_void_p, c_char_p, c_int)
+libcrypto.BIO_free.argtypes = (c_void_p, )
+libcrypto.BIO_new_mem_buf.restype = c_void_p
+libcrypto.BIO_new_mem_buf.argtypes = (c_char_p, c_int)
access to symmetric ciphers from libcrypto
"""
-from ctypes import create_string_buffer,c_char_p,c_void_p,c_int,c_long,byref,POINTER
+from ctypes import create_string_buffer, c_char_p, c_void_p, c_int
+from ctypes import byref, POINTER
from ctypescrypto import libcrypto
from ctypescrypto.exception import LibCryptoError
from ctypescrypto.oid import Oid
CIPHER_ALGORITHMS = ("DES", "DES-EDE3", "BF", "AES-128", "AES-192", "AES-256")
-CIPHER_MODES = ("STREAM","ECB","CBC", "CFB", "OFB", "CTR","GCM")
+CIPHER_MODES = ("STREAM", "ECB", "CBC", "CFB", "OFB", "CTR", "GCM")
#
-__all__ = ['CipherError','new','Cipher','CipherType']
+__all__ = ['CipherError', 'new', 'Cipher', 'CipherType']
class CipherError(LibCryptoError):
+ """
+ Exception raise when OpenSSL function returns error
+ """
pass
-def new(algname,key,encrypt=True,iv=None):
+def new(algname, key, encrypt=True, iv=None):
"""
- Returns new cipher object ready to encrypt-decrypt data
+ Returns new cipher object ready to encrypt-decrypt data
- @param algname - string algorithm name like in opemssl command
- line
- @param key - binary string representing ciher key
- @param encrypt - if True (default) cipher would be initialized
+ @param algname - string algorithm name like in opemssl command
+ line
+ @param key - binary string representing ciher key
+ @param encrypt - if True (default) cipher would be initialized
for encryption, otherwise - for decrypton
- @param iv - initialization vector
+ @param iv - initialization vector
"""
- ct=CipherType(algname)
- return Cipher(ct,key,iv,encrypt)
+ ciph_type = CipherType(algname)
+ return Cipher(ciph_type, key, iv, encrypt)
-class CipherType:
+class CipherType(object):
"""
- Describes cihper algorihm. Can be used to produce cipher
- instance and to get various information about cihper
+ Describes cihper algorihm. Can be used to produce cipher
+ instance and to get various information about cihper
"""
def __init__(self, cipher_name):
"""
- Constructs cipher algortihm using textual name as in openssl
- command line
+ Constructs cipher algortihm using textual name as in openssl
+ command line
"""
self.cipher = libcrypto.EVP_get_cipherbyname(cipher_name)
if self.cipher is None:
raise CipherError("Unknown cipher: %s" % cipher_name)
def __del__(self):
+ """
+ It is constant object with do-nothing del
+ """
pass
+
def block_size(self):
"""
- Returns block size of the cipher
+ Returns block size of the cipher
"""
return libcrypto.EVP_CIPHER_block_size(self.cipher)
+
def key_length(self):
"""
- Returns key length of the cipher
+ Returns key length of the cipher
"""
return libcrypto.EVP_CIPHER_key_length(self.cipher)
+
def iv_length(self):
"""
- Returns initialization vector length of the cipher
+ Returns initialization vector length of the cipher
"""
return libcrypto.EVP_CIPHER_iv_length(self.cipher)
+
def flags(self):
"""
- Return cipher flags. Low three bits of the flags encode
- cipher mode (see mode). Higher bits is combinatuon of
- EVP_CIPH* constants defined in the <openssl/evp.h>
+ Return cipher flags. Low three bits of the flags encode
+ cipher mode (see mode). Higher bits is combinatuon of
+ EVP_CIPH* constants defined in the <openssl/evp.h>
"""
return libcrypto.EVP_CIPHER_flags(self.cipher)
+
def mode(self):
"""
- Returns cipher mode as string constant like CBC, OFB etc.
+ Returns cipher mode as string constant like CBC, OFB etc.
"""
return CIPHER_MODES[self.flags() & 0x7]
+
def algo(self):
"""
- Return cipher's algorithm name, derived from OID
+ Return cipher's algorithm name, derived from OID
"""
- return self.oid().shortname()
+ return self.oid().shortname()
+
def oid(self):
"""
- Returns ASN.1 object identifier of the cipher as
- ctypescrypto.oid.Oid object
+ Returns ASN.1 object identifier of the cipher as
+ ctypescrypto.oid.Oid object
"""
return Oid(libcrypto.EVP_CIPHER_nid(self.cipher))
-class Cipher:
+class Cipher(object):
"""
- Performs actual encrypton decryption
- Note that object keeps some internal state.
- To obtain full ciphertext (or plaintext during decihpering)
- user should concatenate results of all calls of update with
- result of finish
+ Performs actual encrypton decryption
+ Note that object keeps some internal state.
+ To obtain full ciphertext (or plaintext during decihpering)
+ user should concatenate results of all calls of update with
+ result of finish
"""
- def __init__(self, cipher_type, key, iv, encrypt=True):
+ def __init__(self, cipher_type, key, iv, encrypt=True):
"""
- Initializing cipher instance.
+ Initializing cipher instance.
- @param cipher_type - CipherType object
- @param key = binary string representing the key
- @param iv - binary string representing initializtion vector
- @param encrypt - if True(default) we ere encrypting.
- Otherwise decrypting
+ @param cipher_type - CipherType object
+ @param key = binary string representing the key
+ @param iv - binary string representing initializtion vector
+ @param encrypt - if True(default) we ere encrypting.
+ Otherwise decrypting
"""
self._clean_ctx()
if self.ctx == 0:
raise CipherError("Unable to create cipher context")
self.encrypt = encrypt
- enc=1 if encrypt else 0
+ enc = 1 if encrypt else 0
if not iv is None and len(iv) != cipher_type.iv_length():
raise ValueError("Invalid IV length for this algorithm")
-
+
if len(key) != cipher_type.key_length():
if (cipher_type.flags() & 8) != 0:
# Variable key length cipher.
- result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, None, None, c_int(enc))
- result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx,len(key))
+ result = libcrypto.EVP_CipherInit_ex(self.ctx,
+ cipher_type.cipher,
+ None, None, None,
+ c_int(enc))
+ result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx,
+ len(key))
if result == 0:
self._clean_ctx()
raise CipherError("Unable to set key length")
- result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None, key_ptr, iv_ptr, c_int(enc))
+ result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None,
+ key_ptr, iv_ptr,
+ c_int(enc))
else:
raise ValueError("Invalid key length for this algorithm")
else:
- result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc))
+ result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher,
+ None, key_ptr, iv_ptr,
+ c_int(enc))
if result == 0:
self._clean_ctx()
raise CipherError("Unable to initialize cipher")
self.cipher_finalized = False
def __del__(self):
+ """
+ We define _clean_ctx() to do all the cleanup
+ """
self._clean_ctx()
def padding(self, padding=True):
"""
- Sets padding mode of the cipher
+ Sets padding mode of the cipher
"""
- padding_flag=1 if padding else 0
+ padding_flag = 1 if padding else 0
libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag)
def update(self, data):
"""
- Performs actual encrypton/decrypion
+ Performs actual encrypton/decrypion
- @param data - part of the plain text/ciphertext to process
- @returns - part of ciphercext/plain text
+ @param data - part of the plain text/ciphertext to process
+ @returns - part of ciphercext/plain text
- Passd chunk of text doeesn't need to contain full ciher
- blocks. If neccessery, part of passed data would be kept
- internally until next data would be received or finish
- called
+ Passed chunk of text doesn't need to contain full ciher
+ blocks. If neccessery, part of passed data would be kept
+ internally until next data would be received or finish
+ called
"""
- if self.cipher_finalized :
+ if self.cipher_finalized:
raise CipherError("No updates allowed")
- if not isinstance(data,str):
+ if not isinstance(data, str):
raise TypeError("A string is expected")
if len(data) == 0:
return ""
- outbuf=create_string_buffer(self.block_size+len(data))
- outlen=c_int(0)
- ret=libcrypto.EVP_CipherUpdate(self.ctx,outbuf,byref(outlen),
- data,len(data))
- if ret <=0:
+ outbuf = create_string_buffer(self.block_size+len(data))
+ outlen = c_int(0)
+ ret = libcrypto.EVP_CipherUpdate(self.ctx, outbuf, byref(outlen),
+ data, len(data))
+ if ret <= 0:
self._clean_ctx()
- self.cipher_finalized=True
- del self.ctx
+ self.cipher_finalized = True
raise CipherError("problem processing data")
- return outbuf.raw[:outlen.value]
-
+ return outbuf.raw[:int(outlen.value)]
+
def finish(self):
"""
- Finalizes processing. If some data are kept in the internal
- state, they would be processed and returned.
+ Finalizes processing. If some data are kept in the internal
+ state, they would be processed and returned.
"""
- if self.cipher_finalized :
+ if self.cipher_finalized:
raise CipherError("Cipher operation is already completed")
- outbuf=create_string_buffer(self.block_size)
+ outbuf = create_string_buffer(self.block_size)
self.cipher_finalized = True
- outlen=c_int(0)
- result = libcrypto.EVP_CipherFinal_ex(self.ctx,outbuf , byref(outlen))
+ outlen = c_int(0)
+ result = libcrypto.EVP_CipherFinal_ex(self.ctx, outbuf, byref(outlen))
if result == 0:
self._clean_ctx()
raise CipherError("Unable to finalize cipher")
- if outlen.value>0:
- return outbuf.raw[:outlen.value]
+ if outlen.value > 0:
+ return outbuf.raw[:int(outlen.value)]
else:
return ""
-
+
def _clean_ctx(self):
+ """
+ Cleans up cipher ctx and deallocates it
+ """
try:
if self.ctx is not None:
libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx)
libcrypto.EVP_CIPHER_CTX_free(self.ctx)
- del(self.ctx)
+ del self.ctx
except AttributeError:
pass
self.cipher_finalized = True
#
# Used C function block_size
#
-libcrypto.EVP_CIPHER_block_size.argtypes=(c_void_p,)
-libcrypto.EVP_CIPHER_CTX_cleanup.argtypes=(c_void_p,)
-libcrypto.EVP_CIPHER_CTX_free.argtypes=(c_void_p,)
-libcrypto.EVP_CIPHER_CTX_new.restype=c_void_p
-libcrypto.EVP_CIPHER_CTX_set_padding.argtypes=(c_void_p,c_int)
-libcrypto.EVP_CipherFinal_ex.argtypes=(c_void_p,c_char_p,POINTER(c_int))
-libcrypto.EVP_CIPHER_flags.argtypes=(c_void_p,)
-libcrypto.EVP_CipherInit_ex.argtypes=(c_void_p,c_void_p,c_void_p,c_char_p,c_char_p,c_int)
-libcrypto.EVP_CIPHER_iv_length.argtypes=(c_void_p,)
-libcrypto.EVP_CIPHER_key_length.argtypes=(c_void_p,)
-libcrypto.EVP_CIPHER_nid.argtypes=(c_void_p,)
-libcrypto.EVP_CipherUpdate.argtypes=(c_void_p,c_char_p,POINTER(c_int),c_char_p,c_int)
-libcrypto.EVP_get_cipherbyname.restype=c_void_p
-libcrypto.EVP_get_cipherbyname.argtypes=(c_char_p,)
-libcrypto.EVP_CIPHER_CTX_set_key_length.argtypes=(c_void_p,c_int)
+libcrypto.EVP_CIPHER_block_size.argtypes = (c_void_p, )
+libcrypto.EVP_CIPHER_CTX_cleanup.argtypes = (c_void_p, )
+libcrypto.EVP_CIPHER_CTX_free.argtypes = (c_void_p, )
+libcrypto.EVP_CIPHER_CTX_new.restype = c_void_p
+libcrypto.EVP_CIPHER_CTX_set_padding.argtypes = (c_void_p, c_int)
+libcrypto.EVP_CipherFinal_ex.argtypes = (c_void_p, c_char_p, POINTER(c_int))
+libcrypto.EVP_CIPHER_flags.argtypes = (c_void_p, )
+libcrypto.EVP_CipherInit_ex.argtypes = (c_void_p, c_void_p, c_void_p, c_char_p,
+ c_char_p, c_int)
+libcrypto.EVP_CIPHER_iv_length.argtypes = (c_void_p, )
+libcrypto.EVP_CIPHER_key_length.argtypes = (c_void_p, )
+libcrypto.EVP_CIPHER_nid.argtypes = (c_void_p, )
+libcrypto.EVP_CipherUpdate.argtypes = (c_void_p, c_char_p, POINTER(c_int),
+ c_char_p, c_int)
+libcrypto.EVP_get_cipherbyname.restype = c_void_p
+libcrypto.EVP_get_cipherbyname.argtypes = (c_char_p, )
+libcrypto.EVP_CIPHER_CTX_set_key_length.argtypes = (c_void_p, c_int)
"""
-from ctypes import c_int, c_void_p, c_char_p, c_int
+from ctypes import c_int, c_void_p, c_char_p, c_int, c_uint, c_size_t, POINTER
from ctypescrypto.exception import LibCryptoError
from ctypescrypto import libcrypto
from ctypescrypto.bio import Membio
from ctypescrypto.oid import Oid
+from ctypescrypto.x509 import StackOfX509
class CMSError(LibCryptoError):
"""
class Flags:
"""
- Constants for flags passed to the CMS methods.
+ Constants for flags passed to the CMS methods.
Can be OR-ed together
"""
- TEXT=1
- NOCERTS=2
- NO_CONTENT_VERIFY=4
- NO_ATTR_VERIFY=8
- NO_SIGS=NO_CONTENT_VERIFY|NO_ATTR_VERIFY
- NOINTERN=0x10
- NO_SIGNER_CERT_VERIFY=0x20
- NO_VERIFY=0x20
- DETACHED=0x40
- BINARY=0x80
- NOATTR=0x100
- NOSMIMECAP =0x200
- NOOLDMIMETYPE=0x400
- CRLFEOL=0x800
- STREAM=0x1000
- NOCRL=0x2000
- PARTIAL=0x4000
- REUSE_DIGEST=0x8000
- USE_KEYID=0x10000
- DEBUG_DECRYPT=0x20000
+ TEXT = 1
+ NOCERTS = 2
+ NO_CONTENT_VERIFY = 4
+ NO_ATTR_VERIFY = 8
+ NO_SIGS = NO_CONTENT_VERIFY | NO_ATTR_VERIFY
+ NOINTERN = 0x10
+ NO_SIGNER_CERT_VERIFY = 0x20
+ NO_VERIFY = 0x20
+ DETACHED = 0x40
+ BINARY = 0x80
+ NOATTR = 0x100
+ NOSMIMECAP = 0x200
+ NOOLDMIMETYPE = 0x400
+ CRLFEOL = 0x800
+ STREAM = 0x1000
+ NOCRL = 0x2000
+ PARTIAL = 0x4000
+ REUSE_DIGEST = 0x8000
+ USE_KEYID = 0x10000
+ DEBUG_DECRYPT = 0x20000
-def CMS(data,format="PEM"):
+def CMS(data, format="PEM"):
"""
Parses CMS data and returns either SignedData or EnvelopedData
object
"""
- b=Membio(data)
+ bio = Membio(data)
if format == "PEM":
- ptr=libcrypto.PEM_read_bio_CMS(b.bio,None,None,None)
+ ptr = libcrypto.PEM_read_bio_CMS(bio.bio, None, None, None)
else:
- ptr=libcrypto.d2i_CMS_bio(b.bio,None)
+ ptr = libcrypto.d2i_CMS_bio(bio.bio, None)
typeoid = Oid(libcrypto.OBJ_obj2nid(libcrypto.CMS_get0_type(ptr)))
- if typeoid.shortname()=="pkcs7-signedData":
+ if typeoid.shortname() == "pkcs7-signedData":
return SignedData(ptr)
- elif typeoid.shortname()=="pkcs7-envelopedData":
+ elif typeoid.shortname() == "pkcs7-envelopedData":
return EnvelopedData(ptr)
- elif typeoid.shortname()=="pkcs7-encryptedData":
+ elif typeoid.shortname() == "pkcs7-encryptedData":
return EncryptedData(ptr)
else:
raise NotImplementedError("cannot handle "+typeoid.shortname())
-class CMSBase(object):
+class CMSBase(object):
"""
Common ancessor for all CMS types.
Implements serializatio/deserialization
"""
- def __init__(self,ptr=None):
- self.ptr=ptr
+ def __init__(self, ptr=None):
+ self.ptr = ptr
def __str__(self):
"""
Serialize in DER format
"""
- b=Membio()
- if not libcrypto.i2d_CMS_bio(b.bio,self.ptr):
+ bio = Membio()
+ if not libcrypto.i2d_CMS_bio(bio.bio, self.ptr):
raise CMSError("writing CMS to PEM")
- return str(b)
+ return str(bio)
def pem(self):
"""
Serialize in PEM format
"""
- b=Membio()
- if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr):
+ bio = Membio()
+ if not libcrypto.PEM_write_bio_CMS(bio.bio, self.ptr):
raise CMSError("writing CMS to PEM")
- return str(b)
-
-
-
+ return str(bio)
+
+
+#pylint: disable=R0921
class SignedData(CMSBase):
+ """
+ Represents signed message (signeddata CMS type)
+ """
@staticmethod
- def create(data,cert,pkey,flags=Flags.BINARY,certs=[]):
+ def create(data, cert, pkey, flags=Flags.BINARY, certs=None):
"""
Creates SignedData message by signing data with pkey and
certificate.
@param data - data to sign
+ @param cert - signer's certificate
@param pkey - pkey object with private key to sign
@param flags - OReed combination of Flags constants
@param certs - list of X509 objects to include into CMS
"""
if not pkey.cansign:
raise ValueError("Specified keypair has no private part")
- if cert.pubkey!=pkey:
+ if cert.pubkey != pkey:
raise ValueError("Certificate doesn't match public key")
- b=Membio(data)
- if certs is not None and len(certs)>0:
- certstack=StackOfX509(certs)
+ bio = Membio(data)
+ if certs is not None and len(certs) > 0:
+ certstack = StackOfX509(certs)
else:
- certstack=None
- ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags)
+ certstack = None
+ ptr = libcrypto.CMS_sign(cert.cert, pkey.ptr, certstack, bio.bio, flags)
if ptr is None:
raise CMSError("signing message")
return SignedData(ptr)
- def sign(self,cert,pkey,md=None,data=None,flags=Flags.BINARY):
+ def sign(self, cert, pkey, digest_type=None, data=None, flags=Flags.BINARY):
"""
Adds another signer to already signed message
@param cert - signer's certificate
@param pkey - signer's private key
- @param md - message digest to use as DigestType object
+ @param digest_type - message digest to use as DigestType object
(if None - default for key would be used)
@param data - data to sign (if detached and
Flags.REUSE_DIGEST is not specified)
"""
if not pkey.cansign:
raise ValueError("Specified keypair has no private part")
- if cert.pubkey!=pkey:
+ if cert.pubkey != pkey:
raise ValueError("Certificate doesn't match public key")
- p1=libcrypto.CMS_sign_add1_Signer(self.ptr,cert.cert,pkey.ptr,
- md.digest,flags)
- if p1 is None:
+ if libcrypto.CMS_sign_add1_Signer(self.ptr, cert.cert, pkey.ptr,
+ digest_type.digest, flags) is None:
raise CMSError("adding signer")
- if flags & Flags.REUSE_DIGEST==0:
+ if flags & Flags.REUSE_DIGEST == 0:
if data is not None:
- b=Membio(data)
- biodata=b.bio
+ bio = Membio(data)
+ biodata = bio.bio
else:
- biodata=None
- res= libcrypto.CMS_final(self.ptr,biodata,None,flags)
- if res<=0:
- raise CMSError
- def verify(self,store,flags,data=None,certs=[]):
+ biodata = None
+ res = libcrypto.CMS_final(self.ptr, biodata, None, flags)
+ if res <= 0:
+ raise CMSError("Cannot finalize CMS")
+ def verify(self, store, flags, data=None, certs=None):
"""
Verifies signature under CMS message using trusted cert store
sertificates to search for signing certificates
@returns True if signature valid, False otherwise
"""
- bio=None
- if data!=None:
- b=Membio(data)
- bio=b.bio
- if certs is not None and len(certs)>0:
- certstack=StackOfX509(certs)
+ bio = None
+ if data != None:
+ bio_obj = Membio(data)
+ bio = bio_obj.bio
+ if certs is not None and len(certs) > 0:
+ certstack = StackOfX509(certs)
else:
- certstack=None
- res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags)
- return res>0
- @property
+ certstack = None
+ res = libcrypto.CMS_verify(self.ptr, certstack, store.store, bio,
+ None, flags)
+ return res > 0
+
+ @property
def signers(self):
"""
Return list of signer's certificates
"""
- p=libcrypto.CMS_get0_signers(self.ptr)
- if p is None:
- raise CMSError
- return StackOfX509(ptr=p,disposable=False)
+ signerlist = libcrypto.CMS_get0_signers(self.ptr)
+ if signerlist is None:
+ raise CMSError("Cannot get signers")
+ return StackOfX509(ptr=signerlist, disposable=False)
+
@property
def data(self):
"""
Returns signed data if present in the message
"""
- b=Membio()
- if not libcrypto.CMS_verify(self.ptr,None,None,None,b.bio,Flags.NO_VERIFY):
+ bio = Membio()
+ if not libcrypto.CMS_verify(self.ptr, None, None, None, bio.bio,
+ Flags.NO_VERIFY):
raise CMSError("extract data")
- return str(b)
- def addcert(self,cert):
+ return str(bio)
+
+ def addcert(self, cert):
"""
Adds a certificate (probably intermediate CA) to the SignedData
structure
"""
- if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0:
- raise CMSError("adding cert")
- def addcrl(self,crl):
+ if libcrypto.CMS_add1_cert(self.ptr, cert.cert) <= 0:
+ raise CMSError("Cannot add cert")
+ def addcrl(self, crl):
"""
Adds a CRL to the signed data structure
"""
"""
List of the certificates contained in the structure
"""
- p=CMS_get1_certs(self.ptr)
- if p is None:
+ certstack = libcrypto.CMS_get1_certs(self.ptr)
+ if certstack is None:
raise CMSError("getting certs")
- return StackOfX509(ptr=p,disposable=True)
+ return StackOfX509(ptr=certstack, disposable=True)
@property
def crls(self):
"""
raise NotImplementedError
class EnvelopedData(CMSBase):
+ """
+ Represents EnvelopedData CMS, i.e. message encrypted with session
+ keys, encrypted with recipient's public keys
+ """
@staticmethod
- def create(recipients,data,cipher,flags=0):
+ def create(recipients, data, cipher, flags=0):
"""
Creates and encrypts message
@param recipients - list of X509 objects
@param cipher - CipherType object
@param flags - flag
"""
- recp=StackOfX509(recipients)
- b=Membio(data)
- p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags)
- if p is None:
+ recp = StackOfX509(recipients)
+ bio = Membio(data)
+ cms_ptr = libcrypto.CMS_encrypt(recp.ptr, bio.bio, cipher.cipher_type,
+ flags)
+ if cms_ptr is None:
raise CMSError("encrypt EnvelopedData")
- return EnvelopedData(p)
- def decrypt(self,pkey,cert,flags=0):
+ return EnvelopedData(cms_ptr)
+
+ def decrypt(self, pkey, cert, flags=0):
"""
Decrypts message
@param pkey - private key to decrypt
raise ValueError("Specified keypair has no private part")
if pkey != cert.pubkey:
raise ValueError("Certificate doesn't match private key")
- b=Membio()
- res=libcrypto.CMS_decrypt(self.ptr,pkey.ptr,cert.ccert,None,b.bio,flags)
- if res<=0:
+ bio = Membio()
+ res = libcrypto.CMS_decrypt(self.ptr, pkey.ptr, cert.ccert, None,
+ bio.bio, flags)
+ if res <= 0:
raise CMSError("decrypting CMS")
- return str(b)
+ return str(bio)
class EncryptedData(CMSBase):
+ """
+ Represents encrypted data CMS structure, i.e. encrypted
+ with symmetric key, shared by sender and recepient.
+ """
@staticmethod
- def create(data,cipher,key,flags=0):
+ def create(data, cipher, key, flags=0):
"""
Creates an EncryptedData message.
@param data data to encrypt
@param key - byte array used as simmetic key
@param flags - OR-ed combination of Flags constant
"""
- b=Membio(data)
- ptr=libcrypto.CMS_EncryptedData_encrypt(b.bio,cipher.cipher_type,key,len(key),flags)
+ bio = Membio(data)
+ ptr = libcrypto.CMS_EncryptedData_encrypt(bio.bio, cipher.cipher_type,
+ key, len(key), flags)
if ptr is None:
raise CMSError("encrypt data")
return EncryptedData(ptr)
- def decrypt(self,key,flags=0):
+
+ def decrypt(self, key, flags=0):
"""
Decrypts encrypted data message
@param key - symmetic key to decrypt
@param flags - OR-ed combination of Flags constant
"""
- b=Membio()
- if libcrypto.CMS_EncryptedData_decrypt(self.ptr,key,len(key),None,
- b.bio,flags)<=0:
- raise CMSError("decrypt data")
- return str(b)
+ bio = Membio()
+ if libcrypto.CMS_EncryptedData_decrypt(self.ptr, key, len(key), None,
+ bio.bio, flags) <= 0:
+ raise CMSError("decrypt data")
+ return str(bio)
-__all__=['CMS','CMSError','Flags','SignedData','EnvelopedData','EncryptedData']
+__all__ = ['CMS', 'CMSError', 'Flags', 'SignedData', 'EnvelopedData',
+ 'EncryptedData']
-libcrypto.CMS_verify.restype=c_int
-libcrypto.CMS_verify.argtypes=(c_void_p,c_void_p,c_void_p,c_void_p,c_void_p,c_int)
+libcrypto.CMS_add1_cert.restype = c_int
+libcrypto.CMS_add1_cert.argtypes = (c_void_p, c_void_p)
+libcrypto.CMS_decrypt.restype = c_int
+libcrypto.CMS_decrypt.argtypes = (c_void_p, c_void_p, c_void_p,
+ c_void_p, c_void_p, c_uint)
+libcrypto.CMS_encrypt.restype = c_void_p
+libcrypto.CMS_encrypt.argtypes = (c_void_p, c_void_p, c_void_p, c_uint)
+libcrypto.CMS_EncryptedData_decrypt.restype = c_int
+libcrypto.CMS_EncryptedData_decrypt.argtypes = (c_void_p, c_char_p, c_size_t,
+ c_void_p, c_void_p, c_uint)
+libcrypto.CMS_EncryptedData_encrypt.restype = c_void_p
+libcrypto.CMS_EncryptedData_encrypt.argtypes = (c_void_p, c_void_p, c_char_p,
+ c_size_t, c_uint)
+libcrypto.CMS_final.restype = c_int
+libcrypto.CMS_final.argtypes = (c_void_p, c_void_p, c_void_p, c_uint)
+libcrypto.CMS_get0_signers.restype = c_void_p
+libcrypto.CMS_get0_signers.argtypes = (c_void_p, )
+libcrypto.CMS_get1_certs.restype = c_void_p
+libcrypto.CMS_get1_certs.argtypes = (c_void_p, )
+libcrypto.CMS_sign.restype = c_void_p
+libcrypto.CMS_sign.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p, c_uint)
+libcrypto.CMS_sign_add1_Signer.restype = c_void_p
+libcrypto.CMS_sign_add1_Signer.argtypes = (c_void_p, c_void_p, c_void_p,
+ c_void_p, c_uint)
+libcrypto.CMS_verify.restype = c_int
+libcrypto.CMS_verify.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p,
+ c_void_p, c_int)
+libcrypto.d2i_CMS_bio.restype = c_void_p
+libcrypto.d2i_CMS_bio.argtypes = (c_void_p, POINTER(c_void_p))
+libcrypto.i2d_CMS_bio.restype = c_int
+libcrypto.i2d_CMS_bio.argtypes = (c_void_p, c_void_p)
+libcrypto.PEM_read_bio_CMS.restype = c_void_p
+libcrypto.PEM_read_bio_CMS.argtypes = (c_void_p, POINTER(c_void_p),
+ c_void_p, c_void_p)
+libcrypto.PEM_write_bio_CMS.restype = c_int
+libcrypto.PEM_write_bio_CMS.argtypes = (c_void_p, c_void_p)
"""
- Implements interface to OpenSSL EVP_Digest* functions.
+Implements interface to OpenSSL EVP_Digest* functions.
- Interface made as close to hashlib as possible.
+Interface made as close to hashlib as possible.
- This module is really an excess effort. Hashlib allows access to
- mostly same functionality except oids and nids of hashing
- algortithms (which might be needed for private key operations).
+This module is really an excess effort. Hashlib allows access to
+mostly same functionality except oids and nids of hashing
+algortithms (which might be needed for private key operations).
- hashlib even allows to use engine-provided digests if it is build
- with dinamically linked libcrypto - so use
- ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94
- algorithm would be available both to this module and hashlib.
+hashlib even allows to use engine-provided digests if it is build
+with dinamically linked libcrypto - so use
+ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94
+algorithm would be available both to this module and hashlib.
"""
-from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long,c_longlong, create_string_buffer,byref
+from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long, c_longlong
+from ctypes import create_string_buffer, byref
from ctypescrypto import libcrypto
from ctypescrypto.exception import LibCryptoError
from ctypescrypto.oid import Oid
DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512")
-__all__ = ['DigestError','Digest','DigestType','new']
+__all__ = ['DigestError', 'Digest', 'DigestType', 'new']
class DigestError(LibCryptoError):
+ """ Exception raised if some OpenSSL function returns error """
pass
def new(algname):
"""
- Behaves just like hashlib.new. Creates digest object by
- algorithm name
+ Behaves just like hashlib.new. Creates digest object by
+ algorithm name
"""
- md=DigestType(algname)
- return Digest(md)
+
+ digest_type = DigestType(algname)
+ return Digest(digest_type)
class DigestType(object):
"""
-
- Represents EVP_MD object - constant structure which describes
- digest algorithm
-
+ Represents EVP_MD object - constant structure which describes
+ digest algorithm
"""
- def __init__(self, digest_name):
+ def __init__(self, digest_name):
"""
- Finds digest by its name. You can pass Oid object instead of
- name.
+ Finds digest by its name. You can pass Oid object instead of
+ name.
- Special case is when None is passed as name. In this case
- unitialized digest is created, and can be initalized later
- by setting its digest attribute to pointer to EVP_MD
+ Special case is when None is passed as name. In this case
+ unitialized digest is created, and can be initalized later
+ by setting its digest attribute to pointer to EVP_MD
"""
if digest_name is None:
- return
- if isinstance(digest_name,Oid):
- self.digest_name=digest_name.longname()
- self.digest=libcrypto.EVP_get_digestbyname(self.digest_name)
+ return
+
+ if isinstance(digest_name, Oid):
+ self.digest_name = digest_name.longname()
else:
self.digest_name = str(digest_name)
- self.digest = libcrypto.EVP_get_digestbyname(self.digest_name)
+ self.digest = libcrypto.EVP_get_digestbyname(self.digest_name)
if self.digest is None:
raise DigestError("Unknown digest: %s" % self.digest_name)
@property
def name(self):
- if not hasattr(self,'digest_name'):
- self.digest_name=Oid(libcrypto.EVP_MD_type(self.digest)).longname()
+ """ Returns name of the digest """
+ if not hasattr(self, 'digest_name'):
+ self.digest_name = Oid(libcrypto.EVP_MD_type(self.digest)
+ ).longname()
return self.digest_name
+
def __del__(self):
+ """ Empty destructor for constant object """
pass
+
+ @property
def digest_size(self):
+ """ Returns size of digest """
return libcrypto.EVP_MD_size(self.digest)
+
+ @property
def block_size(self):
+ """ Returns block size of the digest """
return libcrypto.EVP_MD_block_size(self.digest)
+
+ @property
def oid(self):
+ """ Returns Oid object of digest type """
return Oid(libcrypto.EVP_MD_type(self.digest))
class Digest(object):
"""
- Represents EVP_MD_CTX object which actually used to calculate
- digests.
-
+ Represents EVP_MD_CTX object which actually used to calculate
+ digests.
"""
- def __init__(self,digest_type):
+
+ def __init__(self, digest_type):
"""
- Initializes digest using given type.
+ Initializes digest using given type.
"""
- self._clean_ctx()
self.ctx = libcrypto.EVP_MD_CTX_create()
if self.ctx is None:
raise DigestError("Unable to create digest context")
+ self.digest_out = None
+ self.digest_finalized = False
result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None)
if result == 0:
self._clean_ctx()
raise DigestError("Unable to initialize digest")
self.digest_type = digest_type
- self.digest_size = self.digest_type.digest_size()
- self.block_size = self.digest_type.block_size()
+ self.digest_size = self.digest_type.digest_size
+ self.block_size = self.digest_type.block_size
def __del__(self):
+ """ Uses _clean_ctx internal method """
self._clean_ctx()
def update(self, data, length=None):
"""
- Hashes given byte string
+ Hashes given byte string
- @param data - string to hash
- @param length - if not specifed, entire string is hashed,
- otherwise only first length bytes
+ @param data - string to hash
+ @param length - if not specifed, entire string is hashed,
+ otherwise only first length bytes
"""
if self.digest_finalized:
raise DigestError("No updates allowed")
- if not isinstance(data,str):
+ if not isinstance(data, str):
raise TypeError("A string is expected")
if length is None:
length = len(data)
raise ValueError("Specified length is greater than length of data")
result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), length)
if result != 1:
- raise DigestError, "Unable to update digest"
-
+ raise DigestError("Unable to update digest")
+
def digest(self, data=None):
"""
- Finalizes digest operation and return digest value
- Optionally hashes more data before finalizing
+ Finalizes digest operation and return digest value
+ Optionally hashes more data before finalizing
"""
if self.digest_finalized:
return self.digest_out.raw[:self.digest_size]
self.update(data)
self.digest_out = create_string_buffer(256)
length = c_long(0)
- result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length))
- if result != 1 :
+ result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out,
+ byref(length))
+ if result != 1:
raise DigestError("Unable to finalize digest")
self.digest_finalized = True
return self.digest_out.raw[:self.digest_size]
def copy(self):
"""
- Creates copy of the digest CTX to allow to compute digest
- while being able to hash more data
+ Creates copy of the digest CTX to allow to compute digest
+ while being able to hash more data
"""
- new_digest=Digest(self.digest_type)
- libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx)
+
+ new_digest = Digest(self.digest_type)
+ libcrypto.EVP_MD_CTX_copy(new_digest.ctx, self.ctx)
return new_digest
def _clean_ctx(self):
+ """
+ Clears and deallocates context
+ """
try:
if self.ctx is not None:
libcrypto.EVP_MD_CTX_destroy(self.ctx)
- del(self.ctx)
+ del self.ctx
except AttributeError:
pass
self.digest_out = None
self.digest_finalized = False
- def hexdigest(self,data=None):
+ def hexdigest(self, data=None):
"""
Returns digest in the hexadecimal form. For compatibility
with hashlib
# Declare function result and argument types
libcrypto.EVP_get_digestbyname.restype = c_void_p
-libcrypto.EVP_get_digestbyname.argtypes = (c_char_p,)
+libcrypto.EVP_get_digestbyname.argtypes = (c_char_p, )
libcrypto.EVP_MD_CTX_create.restype = c_void_p
-libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p,c_void_p,c_void_p)
-libcrypto.EVP_DigestUpdate.argtypes = (c_void_p,c_char_p,c_longlong)
-libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p,c_char_p,POINTER(c_long))
-libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p,)
-libcrypto.EVP_MD_CTX_copy.argtypes=(c_void_p, c_void_p)
-libcrypto.EVP_MD_type.argtypes=(c_void_p,)
-libcrypto.EVP_MD_size.argtypes=(c_void_p,)
-libcrypto.EVP_MD_block_size.argtypes=(c_void_p,)
+# libcrypto.EVP_MD_CTX_create has no arguments
+libcrypto.EVP_DigestInit_ex.restupe = c_int
+libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p, c_void_p, c_void_p)
+libcrypto.EVP_DigestUpdate.restype = c_int
+libcrypto.EVP_DigestUpdate.argtypes = (c_void_p, c_char_p, c_longlong)
+libcrypto.EVP_DigestFinal_ex.restype = c_int
+libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p, c_char_p, POINTER(c_long))
+libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p, )
+libcrypto.EVP_MD_CTX_copy.restype = c_int
+libcrypto.EVP_MD_CTX_copy.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_MD_type.argtypes = (c_void_p, )
+libcrypto.EVP_MD_size.argtypes = (c_void_p, )
+libcrypto.EVP_MD_block_size.restype = c_int
+libcrypto.EVP_MD_block_size.argtypes = (c_void_p, )
+libcrypto.EVP_MD_size.restype = c_int
+libcrypto.EVP_MD_size.argtypes = (c_void_p, )
+libcrypto.EVP_MD_type.restype = c_int
+libcrypto.EVP_MD_type.argtypes = (c_void_p, )
Support for EC keypair operation missing form public libcrypto API
"""
from ctypescrypto.pkey import PKey, PKeyError
-from ctypes import c_void_p,c_char_p,c_int,byref
+from ctypes import c_void_p, c_char_p, c_int, byref
from ctypescrypto import libcrypto
-__all__ = [ 'create']
+__all__ = ['create']
-def create(curve,data):
+def create(curve, data):
"""
- Creates EC keypair from the just secret key and curve name
-
- @param curve - name of elliptic curve
- @param num - byte array or long number representing key
+ Creates EC keypair from the just secret key and curve name
+
+ @param curve - name of elliptic curve
+ @param num - byte array or long number representing key
"""
- ec=libcrypto.EC_KEY_new_by_curve_name(curve.nid)
- if ec is None:
+ ec_key = libcrypto.EC_KEY_new_by_curve_name(curve.nid)
+ if ec_key is None:
raise PKeyError("EC_KEY_new_by_curvename")
- group=libcrypto.EC_KEY_get0_group(ec)
+ group = libcrypto.EC_KEY_get0_group(ec_key)
if group is None:
raise PKeyError("EC_KEY_get0_group")
- libcrypto.EC_GROUP_set_asn1_flag(group,1)
- raw_key=libcrypto.BN_new()
- if isinstance(data,int):
- BN_hex2bn(byref(raw_key),hex(data))
+ libcrypto.EC_GROUP_set_asn1_flag(group, 1)
+ raw_key = libcrypto.BN_new()
+ if isinstance(data, int):
+ libcrypto.BN_hex2bn(byref(raw_key), hex(data))
else:
if raw_key is None:
raise PKeyError("BN_new")
- if libcrypto.BN_bin2bn(data,len(data),raw_key) is None:
+ if libcrypto.BN_bin2bn(data, len(data), raw_key) is None:
raise PKeyError("BN_bin2bn")
- ctx=libcrypto.BN_CTX_new()
+ ctx = libcrypto.BN_CTX_new()
if ctx is None:
raise PKeyError("BN_CTX_new")
- order=libcrypto.BN_new()
+ order = libcrypto.BN_new()
if order is None:
raise PKeyError("BN_new")
priv_key = libcrypto.BN_new()
if priv_key is None:
raise PKeyError("BN_new")
- if libcrypto.EC_GROUP_get_order(group,order,ctx) <=0:
+ if libcrypto.EC_GROUP_get_order(group, order, ctx) <= 0:
raise PKeyError("EC_GROUP_get_order")
- if libcrypto.BN_nnmod(priv_key,raw_key,order,ctx) <=0:
+ if libcrypto.BN_nnmod(priv_key, raw_key, order, ctx) <= 0:
raise PKeyError("BN_nnmod")
- if libcrypto.EC_KEY_set_private_key(ec,priv_key)<=0:
+ if libcrypto.EC_KEY_set_private_key(ec_key, priv_key) <= 0:
raise PKeyError("EC_KEY_set_private_key")
- pub_key=libcrypto.EC_POINT_new(group)
+ pub_key = libcrypto.EC_POINT_new(group)
if pub_key is None:
raise PKeyError("EC_POINT_new")
- if libcrypto.EC_POINT_mul(group,pub_key,priv_key,None,None,ctx)<=0:
+ if libcrypto.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx) <= 0:
raise PKeyError("EC_POINT_mul")
- if libcrypto.EC_KEY_set_public_key(ec,pub_key)<=0:
+ if libcrypto.EC_KEY_set_public_key(ec_key, pub_key) <= 0:
raise PKeyError("EC_KEY_set_public_key")
libcrypto.BN_free(raw_key)
libcrypto.BN_free(order)
libcrypto.BN_free(priv_key)
libcrypto.BN_CTX_free(ctx)
- p=libcrypto.EVP_PKEY_new()
- if p is None:
- raise PKeyError("EVP_PKEY_new")
- if libcrypto.EVP_PKEY_set1_EC_KEY(p,ec)<=0:
+ pkey = libcrypto.EVP_PKEY_new()
+ if pkey is None:
+ raise PKeyError("EVP_PKEY_new")
+ if libcrypto.EVP_PKEY_set1_EC_KEY(pkey, ec_key) <= 0:
raise PKeyError("EVP_PKEY_set1_EC_KEY")
- libcrypto.EC_KEY_free(ec)
- return PKey(ptr=p,cansign=True)
+ libcrypto.EC_KEY_free(ec_key)
+ return PKey(ptr=pkey, cansign=True)
-libcrypto.EVP_PKEY_new.restype=c_void_p
-libcrypto.BN_new.restype=c_void_p
-libcrypto.BN_free.argtypes=(c_void_p,)
-libcrypto.BN_CTX_new.restype=c_void_p
-libcrypto.BN_CTX_free.argtypes=(c_void_p,)
-libcrypto.BN_bin2bn.argtypes=(c_char_p,c_int,c_void_p)
-libcrypto.EC_KEY_set_private_key.argtypes=(c_void_p,c_void_p)
-libcrypto.EC_POINT_new.argtypes=(c_void_p,)
-libcrypto.EC_POINT_new.restype=c_void_p
-libcrypto.EC_POINT_mul.argtypes=(c_void_p,c_void_p,c_void_p,c_void_p,c_void_p,c_void_p)
-libcrypto.EC_KEY_set_public_key.argtypes=(c_void_p,c_void_p)
+libcrypto.EVP_PKEY_new.restype = c_void_p
+libcrypto.BN_new.restype = c_void_p
+libcrypto.BN_free.argtypes = (c_void_p, )
+libcrypto.BN_CTX_new.restype = c_void_p
+libcrypto.BN_CTX_free.argtypes = (c_void_p, )
+libcrypto.BN_bin2bn.argtypes = (c_char_p, c_int, c_void_p)
+libcrypto.EC_KEY_set_private_key.argtypes = (c_void_p, c_void_p)
+libcrypto.EC_POINT_new.argtypes = (c_void_p, )
+libcrypto.EC_POINT_new.restype = c_void_p
+libcrypto.EC_POINT_mul.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p,
+ c_void_p, c_void_p)
+libcrypto.EC_KEY_set_public_key.argtypes = (c_void_p, c_void_p)
"""
engine loading and configuration
"""
-from ctypes import *
+from ctypes import c_void_p, c_char_p, c_int
from ctypescrypto import libcrypto
from ctypescrypto.exception import LibCryptoError
-__all__=['default','set_default']
+__all__ = ['default', 'set_default']
-default=None
+default = None
def set_default(engine):
"""
- Loads specified engine and sets it as default for all
- algorithms, supported by it
+ Loads specified engine and sets it as default for all
+ algorithms, supported by it
"""
global default
- e=libcrypto.ENGINE_by_id(engine)
- if e is None:
+ eng = libcrypto.ENGINE_by_id(engine)
+ if eng is None:
# Try load engine
- e = libcrypto.ENGINE_by_id("dynamic")
- if e is None:
+ eng = libcrypto.ENGINE_by_id("dynamic")
+ if eng is None:
raise LibCryptoError("Cannot get 'dynamic' engine")
- if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0):
+ if not libcrypto.ENGINE_ctrl_cmd_string(eng, "SO_PATH", engine, 0):
raise LibCryptoError("Cannot execute ctrl cmd SO_PATH")
- if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0):
+ if not libcrypto.ENGINE_ctrl_cmd_string(eng, "LOAD", None, 0):
raise LibCryptoError("Cannot execute ctrl cmd LOAD")
- if e is None:
- raise ValueError("Cannot find engine "+engine)
- libcrypto.ENGINE_set_default(e,c_int(0xFFFF))
- default=e
+ if eng is None:
+ raise ValueError("Cannot find engine " + engine)
+ libcrypto.ENGINE_set_default(eng, c_int(0xFFFF))
+ default = eng
# Declare function result and arguments for used functions
-libcrypto.ENGINE_by_id.restype=c_void_p
-libcrypto.ENGINE_by_id.argtypes=(c_char_p,)
-libcrypto.ENGINE_set_default.argtypes=(c_void_p,c_int)
-libcrypto.ENGINE_ctrl_cmd_string.argtypes=(c_void_p,c_char_p,c_char_p,c_int)
-libcrypto.ENGINE_finish.argtypes=(c_char_p,)
+libcrypto.ENGINE_by_id.restype = c_void_p
+libcrypto.ENGINE_by_id.argtypes = (c_char_p, )
+libcrypto.ENGINE_set_default.argtypes = (c_void_p, c_int)
+libcrypto.ENGINE_ctrl_cmd_string.argtypes = (c_void_p, c_char_p, c_char_p,
+ c_int)
+libcrypto.ENGINE_finish.argtypes = (c_char_p, )
"""
Exception which extracts libcrypto error information
"""
-from ctypes import *
+from ctypes import c_ulong, c_char_p, create_string_buffer
from ctypescrypto import libcrypto
-strings_loaded=False
+strings_loaded = False
-__all__ = ['LibCryptoError','clear_err_stack']
-
-def _check_null(s):
- """
- Handle transparently NULL returned from error reporting functions
- instead of strings
- """
- if s is None:
- return ""
- return s
+__all__ = ['LibCryptoError', 'clear_err_stack']
class LibCryptoError(Exception):
"""
- Exception for libcrypto errors. Adds all the info, which can be
- extracted from internal (per-thread) libcrypto error stack to the message,
- passed to the constructor.
+ Exception for libcrypto errors. Adds all the info, which can be
+ extracted from internal (per-thread) libcrypto error stack to the message,
+ passed to the constructor.
"""
- def __init__(self,msg):
+ def __init__(self, msg):
global strings_loaded
if not strings_loaded:
libcrypto.ERR_load_crypto_strings()
strings_loaded = True
- e=libcrypto.ERR_get_error()
- m = msg
- while e != 0:
- m+="\n\t"+_check_null(libcrypto.ERR_lib_error_string(e))+":"+\
- _check_null(libcrypto.ERR_func_error_string(e))+":"+\
- _check_null(libcrypto.ERR_reason_error_string(e))
- e=libcrypto.ERR_get_error()
- self.args=(m,)
+ err_code = libcrypto.ERR_get_error()
+ mesg = msg
+ buf = create_string_buffer(128)
+ while err_code != 0:
+ mesg += "\n\t" + libcrypto.ERR_error_string(err_code, buf)
+ err_code = libcrypto.ERR_get_error()
+ super(LibCryptoError, self).__init__(mesg)
def clear_err_stack():
"""
- Clears internal libcrypto err stack. Call it if you've checked
- return code and processed exceptional situation, so subsequent
- raising of the LibCryptoError wouldn't list already handled errors
+ Clears internal libcrypto err stack. Call it if you've checked
+ return code and processed exceptional situation, so subsequent
+ raising of the LibCryptoError wouldn't list already handled errors
"""
libcrypto.ERR_clear_error()
-
-libcrypto.ERR_lib_error_string.restype=c_char_p
-libcrypto.ERR_func_error_string.restype=c_char_p
-libcrypto.ERR_reason_error_string.restype=c_char_p
+libcrypto.ERR_get_error.restype = c_ulong
+libcrypto.ERR_error_string.restype = c_char_p
+libcrypto.ERR_error_string.argtypes = (c_ulong, c_char_p)
for (name,val) in kwargs.items():
if libcrypto.EVP_PKEY_CTX_ctrl_str(pctx,name,val)<=0:
raise DigestError("Unable to set mac parameter")
- self.digest_size = self.digest_type.digest_size()
- self.block_size = self.digest_type.block_size()
+ self.digest_size = self.digest_type.digest_size
+ self.block_size = self.digest_type.block_size
def digest(self,data=None):
"""
Method digest is redefined to return keyed MAC value instead of
-"""
- Interface to OpenSSL object identifier database.
+"""
+Interface to OpenSSL object identifier database.
+
+It is primarily intended to deal with OIDs which are compiled into the
+database or defined in the openssl configuration files.
- It is primarily intended to deal with OIDs which are compiled into the
- database or defined in the openssl configuration files.
+But see create() function.
- But see create() function
+OpenSSL maintains database of OIDs, which contain long and short
+human-readable names, which correspond to Oid as well as canonical
+dotted-decimal representation, and links it to small integer, named
+numeric identifier or 'nid'. Most OpenSSL functions which deals with
+ASN.1 structures such as certificates or cryptographic messages,
+expect or return nids, but it is very bad idea to hardcode nids into
+your app, because it can change after mere recompilation of OpenSSL
+library.
+This module provides Oid object which represents entry to OpenSSL
+OID database.
"""
from ctypescrypto import libcrypto
from ctypes import c_char_p, c_void_p, c_int, create_string_buffer
+from ctypescrypto.exception import LibCryptoError
-__all__ = ['Oid','create','cleanup']
+__all__ = ['Oid', 'create', 'cleanup']
class Oid(object):
"""
- Represents an OID. It can be consturucted by textual
- representation like Oid("commonName") or Oid("CN"),
- dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric
- identifer (NID), which is typically returned or required by
- OpenSSL API functions. If object is consturcted from textual
- representation which is not present in the database, it fails
- with ValueError
+ Represents an OID (ASN.1 Object identifier).
- attribute nid - contains object nid.
+ It can be consturucted by textual
+ representation like Oid("commonName") or Oid("CN"),
+ dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric
+ identifer (NID), which is typically returned or required by
+ OpenSSL API functions. If object is consturcted from textual
+ representation which is not present in the database, it fails
+ with ValueError
+ attribute nid - contains object nid.
"""
- def __init__(self,value):
- " Object constuctor. Accepts string or integer"
- if isinstance(value,unicode):
- value=value.encode('ascii')
- if isinstance(value,str):
- self.nid=libcrypto.OBJ_txt2nid(value)
- if self.nid==0:
- raise ValueError("Cannot find object %s in the database"%(value))
- elif isinstance(value,(int,long)):
- cn=libcrypto.OBJ_nid2sn(value)
- if cn is None:
- raise ValueError("No such nid %d in the database"%(value))
- self.nid=value
+ def __init__(self, value):
+ """
+ Object constuctor. Accepts string, integer, or another Oid
+ object.
+
+ Integer should be OpenSSL numeric identifier (nid) as returned
+ by some libcrypto function or extracted from some libcrypto
+ structure
+ """
+ if isinstance(value, unicode):
+ value = value.encode('ascii')
+ if isinstance(value, str):
+ self.nid = libcrypto.OBJ_txt2nid(value)
+ if self.nid == 0:
+ raise ValueError("Cannot find object %s in the database" %
+ value)
+ elif isinstance(value, (int, long)):
+ short = libcrypto.OBJ_nid2sn(value)
+ if short is None:
+ raise ValueError("No such nid %d in the database" % value)
+ self.nid = value
+ elif isinstance(value, Oid):
+ self.nid = value.nid
else:
raise TypeError("Cannot convert this type to object identifier")
def __hash__(self):
- " Returns NID "
+ " Hash of object is equal to nid because Oids with same nid are same"
return self.nid
- def __cmp__(self,other):
+ def __cmp__(self, other):
" Compares NIDs of two objects "
- return self.nid-other.nid
+ return self.nid - other.nid
def __str__(self):
- " Default string representation of Oid is dotted-decimal"
+ " Default string representation of Oid is dotted-decimal "
return self.dotted()
def __repr__(self):
- return "Oid('%s')"%(self.dotted())
+ " Returns constructor call of Oid with dotted representation "
+ return "Oid('%s')" % (self.dotted())
def shortname(self):
" Returns short name if any "
return libcrypto.OBJ_nid2sn(self.nid)
return libcrypto.OBJ_nid2ln(self.nid)
def dotted(self):
" Returns dotted-decimal reperesentation "
- obj=libcrypto.OBJ_nid2obj(self.nid)
- buf=create_string_buffer(256)
- libcrypto.OBJ_obj2txt(buf,256,obj,1)
+ obj = libcrypto.OBJ_nid2obj(self.nid)
+ buf = create_string_buffer(256)
+ libcrypto.OBJ_obj2txt(buf, 256, obj, 1)
return buf.value
@staticmethod
def fromobj(obj):
"""
Creates an OID object from the pointer to ASN1_OBJECT c structure.
- Strictly for internal use
+ This method intended for internal use for submodules which deal
+ with libcrypto ASN1 parsing functions, such as x509 or CMS
"""
- nid=libcrypto.OBJ_obj2nid(obj)
- if nid==0:
- buf=create_string_buffer(80)
- l=libcrypto.OBJ_obj2txt(buf,80,obj,1)
- oid=create(buf[0:l],buf[0:l],buf[0:l])
+ nid = libcrypto.OBJ_obj2nid(obj)
+ if nid == 0:
+ buf = create_string_buffer(80)
+ dotted_len = libcrypto.OBJ_obj2txt(buf, 80, obj, 1)
+ dotted = buf[:dotted_len]
+ oid = create(dotted, dotted, dotted)
else:
- oid=Oid(nid)
+ oid = Oid(nid)
return oid
-def create(dotted,shortname,longname):
+def create(dotted, shortname, longname):
"""
- Creates new OID in the database
+ Creates new OID in the database
+
+ @param dotted - dotted-decimal representation of new OID
+ @param shortname - short name for new OID
+ @param longname - long name for new OID
+
+ @returns Oid object corresponding to new OID
- @param dotted - dotted-decimal representation of new OID
- @param shortname - short name for new OID
- @param longname - long name for new OID
+ This function should be used with exreme care. Whenever
+ possible, it is better to add new OIDs via OpenSSL configuration
+ file
- @returns Oid object corresponding to new OID
-
- This function should be used with exreme care. Whenever
- possible, it is better to add new OIDs via OpenSSL configuration
- file
+ Results of calling this function twice for same OIDor for
+ Oid alredy in database are undefined
- Results of calling this function twice for same OIDor for
- Oid alredy in database are undefined
"""
- nid=libcrypto.OBJ_create(dotted,shortname,longname)
+ nid = libcrypto.OBJ_create(dotted, shortname, longname)
if nid == 0:
raise LibCryptoError("Problem adding new OID to the database")
return Oid(nid)
def cleanup():
"""
- Removes all the objects, dynamically added by current
- application from database.
+ Removes all the objects, dynamically added by current
+ application from database.
"""
libcrypto.OBJ_cleanup()
-libcrypto.OBJ_nid2sn.restype=c_char_p
-libcrypto.OBJ_nid2ln.restype=c_char_p
-libcrypto.OBJ_nid2obj.restype=c_void_p
-libcrypto.OBJ_obj2txt.argtypes=(c_char_p,c_int,c_void_p,c_int)
-libcrypto.OBJ_txt2nid.argtupes=(c_char_p,)
-libcrypto.OBJ_create.argtypes=(c_char_p,c_char_p,c_char_p)
+libcrypto.OBJ_nid2sn.restype = c_char_p
+libcrypto.OBJ_nid2ln.restype = c_char_p
+libcrypto.OBJ_nid2obj.restype = c_void_p
+libcrypto.OBJ_obj2txt.argtypes = (c_char_p, c_int, c_void_p, c_int)
+libcrypto.OBJ_txt2nid.argtupes = (c_char_p, )
+libcrypto.OBJ_create.argtypes = (c_char_p, c_char_p, c_char_p)
"""
-from ctypes import c_char_p,c_int, c_void_p, create_string_buffer
+from ctypes import c_char_p, c_int, c_void_p, create_string_buffer
from ctypescrypto import libcrypto
from ctypescrypto.digest import DigestType
+from ctypescrypto.exception import LibCryptoError
__all__ = ['pbkdf2']
-def pbkdf2(password,salt,outlen,digesttype="sha1",iterations=2000):
+def pbkdf2(password, salt, outlen, digesttype="sha1", iterations=2000):
"""
- Interface to PKCS5_PBKDF2_HMAC function
- Parameters:
-
- @param password - password to derive key from
- @param salt - random salt to use for key derivation
- @param outlen - number of bytes to derive
- @param digesttype - name of digest to use to use (default sha1)
- @param iterations - number of iterations to use
+ Interface to PKCS5_PBKDF2_HMAC function
+ Parameters:
- @returns outlen bytes of key material derived from password and salt
+ @param password - password to derive key from
+ @param salt - random salt to use for key derivation
+ @param outlen - number of bytes to derive
+ @param digesttype - name of digest to use to use (default sha1)
+ @param iterations - number of iterations to use
+
+ @returns outlen bytes of key material derived from password and salt
"""
- dt=DigestType(digesttype)
- out=create_string_buffer(outlen)
- res=libcrypto.PKCS5_PBKDF2_HMAC(password,len(password),salt,len(salt),
- iterations,dt.digest,outlen,out)
- if res<=0:
+ dgst = DigestType(digesttype)
+ out = create_string_buffer(outlen)
+ res = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt),
+ iterations, dgst.digest, outlen, out)
+ if res <= 0:
raise LibCryptoError("error computing PBKDF2")
return out.raw
-libcrypto.PKCS5_PBKDF2_HMAC.argtypes=(c_char_p,c_int,c_char_p,c_int,c_int,
- c_void_p,c_int,c_char_p)
-libcrypto.PKCS5_PBKDF2_HMAC.restupe=c_int
+libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int,
+ c_void_p, c_int, c_char_p)
+libcrypto.PKCS5_PBKDF2_HMAC.restupe = c_int
"""
-from ctypes import c_char_p,c_void_p,byref,c_int,c_long, c_longlong, create_string_buffer,CFUNCTYPE,POINTER
+from ctypes import c_char_p, c_void_p, c_int, c_long, POINTER
+from ctypes import create_string_buffer, byref, memmove, CFUNCTYPE
from ctypescrypto import libcrypto
-from ctypescrypto.exception import LibCryptoError,clear_err_stack
+from ctypescrypto.exception import LibCryptoError, clear_err_stack
from ctypescrypto.bio import Membio
-import sys
-__all__ = ['PKeyError','password_callback','PKey']
+__all__ = ['PKeyError', 'password_callback', 'PKey', 'PW_CALLBACK_FUNC']
class PKeyError(LibCryptoError):
+ """ Exception thrown if libcrypto finctions return an error """
pass
-CALLBACK_FUNC=CFUNCTYPE(c_int,c_char_p,c_int,c_int,c_char_p)
-def password_callback(buf,length,rwflag,u):
+PW_CALLBACK_FUNC = CFUNCTYPE(c_int, c_char_p, c_int, c_int, c_char_p)
+""" Function type for pem password callback """
+
+def password_callback(buf, length, rwflag, userdata):
"""
- Example password callback for private key. Assumes that
- password is store in the userdata parameter, so allows to pass password
+ Example password callback for private key. Assumes that
+ password is stored in the userdata parameter, so allows to pass password
from constructor arguments to the libcrypto keyloading functions
"""
- cnt=len(u)
- if length<cnt:
- cnt=length
- memmove(buf,u,cnt)
+ cnt = len(userdata)
+ if length < cnt:
+ cnt = length
+ memmove(buf, userdata, cnt)
return cnt
-_cb=CALLBACK_FUNC(password_callback)
+_cb = PW_CALLBACK_FUNC(password_callback)
class PKey(object):
- def __init__(self,ptr=None,privkey=None,pubkey=None,format="PEM",cansign=False,password=None):
+ """
+ Represents public/private key pair. Wrapper around EVP_PKEY
+ libcrypto object.
+
+ May contain either both private and public key (such objects can be
+ used for signing, deriving shared key as well as verifying or public
+ key only, which can be used for verifying or as peer key when
+ deriving.
+
+ @var cansign is true key has private part.
+ @var key contain pointer to EVP_PKEY and should be passed to various
+ libcrypto routines
+ """
+ def __init__(self, ptr=None, privkey=None, pubkey=None, format="PEM",
+ cansign=False, password=None, callback=_cb):
if not ptr is None:
- self.key=ptr
- self.cansign=cansign
+ self.key = ptr
+ self.cansign = cansign
if not privkey is None or not pubkey is None:
- raise TypeError("Just one of ptr, pubkey or privkey can be specified")
+ raise TypeError("Just one of ptr, pubkey or privkey can " +
+ "be specified")
elif not privkey is None:
if not pubkey is None:
- raise TypeError("Just one of ptr, pubkey or privkey can be specified")
- b=Membio(privkey)
- self.cansign=True
+ raise TypeError("Just one of ptr, pubkey or privkey can " +
+ "be specified")
+ bio = Membio(privkey)
+ self.cansign = True
if format == "PEM":
- self.key=libcrypto.PEM_read_bio_PrivateKey(b.bio,None,_cb,c_char_p(password))
- else:
- self.key=libcrypto.d2i_PrivateKey_bio(b.bio,None)
+ self.key = libcrypto.PEM_read_bio_PrivateKey(bio.bio, None,
+ callback,
+ c_char_p(password))
+ else:
+ self.key = libcrypto.d2i_PrivateKey_bio(bio.bio, None)
if self.key is None:
raise PKeyError("error parsing private key")
elif not pubkey is None:
- b=Membio(pubkey)
- self.cansign=False
+ bio = Membio(pubkey)
+ self.cansign = False
if format == "PEM":
- self.key=libcrypto.PEM_read_bio_PUBKEY(b.bio,None,_cb,None)
+ self.key = libcrypto.PEM_read_bio_PUBKEY(bio.bio, None,
+ callback,
+ c_char_p(password))
else:
- self.key=libcrypto.d2i_PUBKEY_bio(b.bio,None)
+ self.key = libcrypto.d2i_PUBKEY_bio(bio.bio, None)
if self.key is None:
raise PKeyError("error parsing public key")
else:
raise TypeError("Neither public, nor private key is specified")
-
+
def __del__(self):
+ """ Frees EVP_PKEY object (note, it is reference counted) """
libcrypto.EVP_PKEY_free(self.key)
- def __eq__(self,other):
+
+ def __eq__(self, other):
""" Compares two public keys. If one has private key and other
doesn't it doesn't affect result of comparation
"""
- return libcrypto.EVP_PKEY_cmp(self.key,other.key)==1
- def __ne__(self,other):
+ return libcrypto.EVP_PKEY_cmp(self.key, other.key) == 1
+
+ def __ne__(self, other):
+ """ Compares two public key for not-equality """
return not self.__eq__(other)
+
def __str__(self):
- """ printable representation of public key """
- b=Membio()
- libcrypto.EVP_PKEY_print_public(b.bio,self.key,0,None)
- return str(b)
+ """ printable representation of public key """
+ bio = Membio()
+ libcrypto.EVP_PKEY_print_public(bio.bio, self.key, 0, None)
+ return str(bio)
- def sign(self,digest,**kwargs):
+ def sign(self, digest, **kwargs):
"""
- Signs given digest and retirns signature
- Keyword arguments allows to set various algorithm-specific
- parameters. See pkeyutl(1) manual.
+ Signs given digest and retirns signature
+ Keyword arguments allows to set various algorithm-specific
+ parameters. See pkeyutl(1) manual.
"""
- ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None)
+ ctx = libcrypto.EVP_PKEY_CTX_new(self.key, None)
if ctx is None:
raise PKeyError("Initailizing sign context")
- if libcrypto.EVP_PKEY_sign_init(ctx)<1:
+ if libcrypto.EVP_PKEY_sign_init(ctx) < 1:
raise PKeyError("sign_init")
- self._configure_context(ctx,kwargs)
+ self._configure_context(ctx, kwargs)
# Find out signature size
- siglen=c_long(0)
- if libcrypto.EVP_PKEY_sign(ctx,None,byref(siglen),digest,len(digest))<1:
- raise PKeyError("signing")
- sig=create_string_buffer(siglen.value)
- libcrypto.EVP_PKEY_sign(ctx,sig,byref(siglen),digest,len(digest))
+ siglen = c_long(0)
+ if libcrypto.EVP_PKEY_sign(ctx, None, byref(siglen), digest,
+ len(digest)) < 1:
+ raise PKeyError("computing signature length")
+ sig = create_string_buffer(siglen.value)
+ if libcrypto.EVP_PKEY_sign(ctx, sig, byref(siglen), digest,
+ len(digest)) < 1:
+ raise PKeyError("signing")
libcrypto.EVP_PKEY_CTX_free(ctx)
- return sig.raw[:siglen.value]
+ return sig.raw[:int(siglen.value)]
- def verify(self,digest,signature,**kwargs):
+ def verify(self, digest, signature, **kwargs):
"""
- Verifies given signature on given digest
- Returns True if Ok, False if don't match
- Keyword arguments allows to set algorithm-specific
- parameters
+ Verifies given signature on given digest
+ Returns True if Ok, False if don't match
+ Keyword arguments allows to set algorithm-specific
+ parameters
"""
- ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None)
+ ctx = libcrypto.EVP_PKEY_CTX_new(self.key, None)
if ctx is None:
raise PKeyError("Initailizing verify context")
- if libcrypto.EVP_PKEY_verify_init(ctx)<1:
+ if libcrypto.EVP_PKEY_verify_init(ctx) < 1:
raise PKeyError("verify_init")
- self._configure_context(ctx,kwargs)
- rv=libcrypto.EVP_PKEY_verify(ctx,signature,len(signature),digest,len(digest))
- if rv<0:
+ self._configure_context(ctx, kwargs)
+ ret = libcrypto.EVP_PKEY_verify(ctx, signature, len(signature), digest,
+ len(digest))
+ if ret < 0:
raise PKeyError("Signature verification")
libcrypto.EVP_PKEY_CTX_free(ctx)
- return rv>0
- def derive(self,peerkey,**kwargs):
+ return ret > 0
+
+ def derive(self, peerkey, **kwargs):
"""
- Derives shared key (DH,ECDH,VKO 34.10). Requires
- private key available
+ Derives shared key (DH,ECDH,VKO 34.10). Requires
+ private key available
- @param peerkey - other key (may be public only)
+ @param peerkey - other key (may be public only)
- Keyword parameters are algorithm-specific
+ Keyword parameters are algorithm-specific
"""
- ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None)
+ if not self.cansign:
+ raise ValueError("No private key available")
+ ctx = libcrypto.EVP_PKEY_CTX_new(self.key, None)
if ctx is None:
raise PKeyError("Initailizing derive context")
- if libcrypto.EVP_PKEY_derive_init(ctx)<1:
+ if libcrypto.EVP_PKEY_derive_init(ctx) < 1:
raise PKeyError("derive_init")
-
- self._configure_context(ctx,kwargs,["ukm"])
- if libcrypto.EVP_PKEY_derive_set_peer(ctx,peerkey.key)<=0:
+ # This is workaround around missing functionality in GOST engine
+ # it provides only numeric control command to set UKM, not
+ # string one.
+ self._configure_context(ctx, kwargs, ["ukm"])
+ if libcrypto.EVP_PKEY_derive_set_peer(ctx, peerkey.key) <= 0:
raise PKeyError("Cannot set peer key")
if "ukm" in kwargs:
- if libcrypto.EVP_PKEY_CTX_ctrl(ctx,-1,1<<10,8,8,kwargs["ukm"])<=0:
+ # We just hardcode numeric command to set UKM here
+ if libcrypto.EVP_PKEY_CTX_ctrl(ctx, -1, 1 << 10, 8, 8,
+ kwargs["ukm"]) <= 0:
raise PKeyError("Cannot set UKM")
- keylen=c_long(0)
- if libcrypto.EVP_PKEY_derive(ctx,None,byref(keylen))<=0:
+ keylen = c_long(0)
+ if libcrypto.EVP_PKEY_derive(ctx, None, byref(keylen)) <= 0:
raise PKeyError("computing shared key length")
- buf=create_string_buffer(keylen.value)
- if libcrypto.EVP_PKEY_derive(ctx,buf,byref(keylen))<=0:
+ buf = create_string_buffer(keylen.value)
+ if libcrypto.EVP_PKEY_derive(ctx, buf, byref(keylen)) <= 0:
raise PKeyError("computing actual shared key")
libcrypto.EVP_PKEY_CTX_free(ctx)
- return buf.raw[:keylen.value]
+ return buf.raw[:int(keylen.value)]
+
@staticmethod
- def generate(algorithm,**kwargs):
+ def generate(algorithm, **kwargs):
"""
- Generates new private-public key pair for given algorithm
- (string like 'rsa','ec','gost2001') and algorithm-specific
- parameters.
+ Generates new private-public key pair for given algorithm
+ (string like 'rsa','ec','gost2001') and algorithm-specific
+ parameters.
- Algorithm specific paramteers for RSA:
+ Algorithm specific paramteers for RSA:
- rsa_keygen_bits=number - size of key to be generated
- rsa_keygen_pubexp - RSA public expontent(default 65537)
+ rsa_keygen_bits=number - size of key to be generated
+ rsa_keygen_pubexp - RSA public expontent(default 65537)
- Algorithm specific parameters for DSA,DH and EC
+ Algorithm specific parameters for DSA,DH and EC
- paramsfrom=PKey object
+ paramsfrom=PKey object
- copy parameters of newly generated key from existing key
+ copy parameters of newly generated key from existing key
- Algorithm specific parameters for GOST2001
+ Algorithm specific parameters for GOST2001
- paramset= paramset name where name is one of
- 'A','B','C','XA','XB','test'
+ paramset= paramset name where name is one of
+ 'A','B','C','XA','XB','test'
- paramsfrom does work too
+ paramsfrom does work too
"""
- tmpeng=c_void_p(None)
- ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1)
+ tmpeng = c_void_p(None)
+ ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), algorithm, -1)
if ameth is None:
- raise PKeyError("Algorithm %s not foind\n"%(algname))
+ raise PKeyError("Algorithm %s not foind\n"%(algorithm))
clear_err_stack()
- pkey_id=c_int(0)
- libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth)
+ pkey_id = c_int(0)
+ libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id), None, None, None,
+ None, ameth)
#libcrypto.ENGINE_finish(tmpeng)
if "paramsfrom" in kwargs:
- ctx=libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key,None)
+ ctx = libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key, None)
else:
- ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id,None)
+ ctx = libcrypto.EVP_PKEY_CTX_new_id(pkey_id, None)
# FIXME support EC curve as keyword param by invoking paramgen
# operation
if ctx is None:
- raise PKeyError("Creating context for key type %d"%(pkey_id.value))
- if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 :
+ raise PKeyError("Creating context for key type %d"%(pkey_id.value))
+ if libcrypto.EVP_PKEY_keygen_init(ctx) <= 0:
raise PKeyError("keygen_init")
- PKey._configure_context(ctx,kwargs,["paramsfrom"])
- key=c_void_p(None)
- if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0:
+ PKey._configure_context(ctx, kwargs, ["paramsfrom"])
+ key = c_void_p(None)
+ if libcrypto.EVP_PKEY_keygen(ctx, byref(key)) <= 0:
raise PKeyError("Error generating key")
libcrypto.EVP_PKEY_CTX_free(ctx)
- return PKey(ptr=key,cansign=True)
- def exportpub(self,format="PEM"):
+ return PKey(ptr=key, cansign=True)
+
+ def exportpub(self, format="PEM"):
"""
- Returns public key as PEM or DER structure.
+ Returns public key as PEM or DER structure.
"""
- b=Membio()
+ bio = Membio()
if format == "PEM":
- r=libcrypto.PEM_write_bio_PUBKEY(b.bio,self.key)
+ retcode = libcrypto.PEM_write_bio_PUBKEY(bio.bio, self.key)
else:
- r=libcrypto.i2d_PUBKEY_bio(b.bio,self.key)
- if r==0:
+ retcode = libcrypto.i2d_PUBKEY_bio(bio.bio, self.key)
+ if retcode == 0:
raise PKeyError("error serializing public key")
- return str(b)
- def exportpriv(self,format="PEM",password=None,cipher=None):
+ return str(bio)
+
+ def exportpriv(self, format="PEM", password=None, cipher=None,
+ callback=_cb):
"""
- Returns private key as PEM or DER Structure.
- If password and cipher are specified, encrypts key
- on given password, using given algorithm. Cipher must be
- an ctypescrypto.cipher.CipherType object
+ Returns private key as PEM or DER Structure.
+ If password and cipher are specified, encrypts key
+ on given password, using given algorithm. Cipher must be
+ an ctypescrypto.cipher.CipherType object
"""
- b=Membio()
+ bio = Membio()
if cipher is None:
- evp_cipher=None
+ evp_cipher = None
else:
- if password is None:
- raise NotImplementedError("Interactive password entry is not supported")
- evp_cipher=cipher.cipher
+ evp_cipher = cipher.cipher
if format == "PEM":
- r=libcrypto.PEM_write_bio_PrivateKey(b.bio,self.key,evp_cipher,None,0,_cb,
- password)
+ ret = libcrypto.PEM_write_bio_PrivateKey(bio.bio, self.key,
+ evp_cipher, None, 0,
+ callback,
+ c_char_p(password))
else:
- if cipher is not None:
- raise NotImplementedError("Der-formatted encrypted keys are not supported")
- r=libcrypto.i2d_PrivateKey_bio(b.bio,self.key)
- if r==0:
+ ret = libcrypto.i2d_PKCS8PrivateKey_bio(bio.bio, self.key,
+ evp_cipher, None, 0,
+ callback,
+ c_char_p(password))
+ if ret == 0:
raise PKeyError("error serializing private key")
- return str(b)
+ return str(bio)
+
@staticmethod
- def _configure_context(ctx,opts,skip=[]):
+ def _configure_context(ctx, opts, skip=()):
"""
- Configures context of public key operations
- @param ctx - context to configure
- @param opts - dictionary of options (from kwargs of calling
- function)
- @param skip - list of options which shouldn't be passed to
- context
+ Configures context of public key operations
+ @param ctx - context to configure
+ @param opts - dictionary of options (from kwargs of calling
+ function)
+ @param skip - list of options which shouldn't be passed to
+ context
"""
for oper in opts:
if oper in skip:
continue
- rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,str(opts[oper]))
- if rv==-2:
- raise PKeyError("Parameter %s is not supported by key"%(oper,))
- if rv<1:
- raise PKeyError("Error setting parameter %s"%(oper,))
+ ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, oper, str(opts[oper]))
+ if ret == -2:
+ raise PKeyError("Parameter %s is not supported by key" % oper)
+ if ret < 1:
+ raise PKeyError("Error setting parameter %s" % oper)
# Declare function prototypes
-libcrypto.EVP_PKEY_cmp.argtypes=(c_void_p,c_void_p)
-libcrypto.PEM_read_bio_PrivateKey.restype=c_void_p
-libcrypto.PEM_read_bio_PrivateKey.argtypes=(c_void_p,POINTER(c_void_p),CALLBACK_FUNC,c_char_p)
-libcrypto.PEM_read_bio_PUBKEY.restype=c_void_p
-libcrypto.PEM_read_bio_PUBKEY.argtypes=(c_void_p,POINTER(c_void_p),CALLBACK_FUNC,c_char_p)
-libcrypto.d2i_PUBKEY_bio.restype=c_void_p
-libcrypto.d2i_PUBKEY_bio.argtypes=(c_void_p,c_void_p)
-libcrypto.d2i_PrivateKey_bio.restype=c_void_p
-libcrypto.d2i_PrivateKey_bio.argtypes=(c_void_p,c_void_p)
-libcrypto.EVP_PKEY_print_public.argtypes=(c_void_p,c_void_p,c_int,c_void_p)
-libcrypto.EVP_PKEY_asn1_find_str.restype=c_void_p
-libcrypto.EVP_PKEY_asn1_find_str.argtypes=(c_void_p,c_char_p,c_int)
-libcrypto.EVP_PKEY_asn1_get0_info.restype=c_int
-libcrypto.EVP_PKEY_asn1_get0_info.argtypes=(POINTER(c_int),POINTER(c_int),POINTER(c_int),POINTER(c_char_p), POINTER(c_char_p),c_void_p)
-libcrypto.EVP_PKEY_cmp.restype=c_int
-libcrypto.EVP_PKEY_cmp.argtypes=(c_void_p,c_void_p)
-libcrypto.EVP_PKEY_CTX_ctrl_str.restype=c_int
-libcrypto.EVP_PKEY_CTX_ctrl_str.argtypes=(c_void_p,c_void_p,c_void_p)
-libcrypto.EVP_PKEY_CTX_ctrl.restype=c_int
-libcrypto.EVP_PKEY_CTX_ctrl.argtypes=(c_void_p,c_int,c_int,c_int,c_int,c_void_p)
-libcrypto.EVP_PKEY_CTX_free.argtypes=(c_void_p,)
-libcrypto.EVP_PKEY_CTX_new.restype=c_void_p
-libcrypto.EVP_PKEY_CTX_new.argtypes=(c_void_p,c_void_p)
-libcrypto.EVP_PKEY_CTX_new_id.restype=c_void_p
-libcrypto.EVP_PKEY_CTX_new_id.argtypes=(c_int,c_void_p)
-libcrypto.EVP_PKEY_derive.restype=c_int
-libcrypto.EVP_PKEY_derive.argtypes=(c_void_p,c_char_p,POINTER(c_long))
-libcrypto.EVP_PKEY_derive_init.restype=c_int
-libcrypto.EVP_PKEY_derive_init.argtypes=(c_void_p,)
-libcrypto.EVP_PKEY_derive_set_peer.restype=c_int
-libcrypto.EVP_PKEY_derive_set_peer.argtypes=(c_void_p,c_void_p)
-libcrypto.EVP_PKEY_free.argtypes=(c_void_p,)
-libcrypto.EVP_PKEY_keygen.restype=c_int
-libcrypto.EVP_PKEY_keygen.argtypes=(c_void_p,c_void_p)
-libcrypto.EVP_PKEY_keygen_init.restype=c_int
-libcrypto.EVP_PKEY_keygen_init.argtypes=(c_void_p,)
-libcrypto.EVP_PKEY_sign.restype=c_int
-libcrypto.EVP_PKEY_sign.argtypes=(c_void_p,c_char_p,POINTER(c_long),c_char_p,c_long)
-libcrypto.EVP_PKEY_sign_init.restype=c_int
-libcrypto.EVP_PKEY_sign_init.argtypes=(c_void_p,)
-libcrypto.EVP_PKEY_verify.restype=c_int
-libcrypto.EVP_PKEY_verify.argtypes=(c_void_p,c_char_p,c_long,c_char_p,c_long)
-libcrypto.EVP_PKEY_verify_init.restype=c_int
-libcrypto.EVP_PKEY_verify_init.argtypes=(c_void_p,)
-libcrypto.PEM_write_bio_PrivateKey.argtypes=(c_void_p,c_void_p,c_void_p,c_char_p,c_int,CALLBACK_FUNC,c_char_p)
-libcrypto.PEM_write_bio_PUBKEY.argtypes=(c_void_p,c_void_p)
-libcrypto.i2d_PUBKEY_bio.argtypes=(c_void_p,c_void_p)
-libcrypto.i2d_PrivateKey_bio.argtypes=(c_void_p,c_void_p)
-libcrypto.ENGINE_finish.argtypes=(c_void_p,)
+libcrypto.EVP_PKEY_cmp.argtypes = (c_void_p, c_void_p)
+libcrypto.PEM_read_bio_PrivateKey.restype = c_void_p
+libcrypto.PEM_read_bio_PrivateKey.argtypes = (c_void_p, POINTER(c_void_p),
+ PW_CALLBACK_FUNC, c_char_p)
+libcrypto.PEM_read_bio_PUBKEY.restype = c_void_p
+libcrypto.PEM_read_bio_PUBKEY.argtypes = (c_void_p, POINTER(c_void_p),
+ PW_CALLBACK_FUNC, c_char_p)
+libcrypto.d2i_PUBKEY_bio.restype = c_void_p
+libcrypto.d2i_PUBKEY_bio.argtypes = (c_void_p, c_void_p)
+libcrypto.d2i_PrivateKey_bio.restype = c_void_p
+libcrypto.d2i_PrivateKey_bio.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_PKEY_print_public.argtypes = (c_void_p, c_void_p, c_int, c_void_p)
+libcrypto.EVP_PKEY_asn1_find_str.restype = c_void_p
+libcrypto.EVP_PKEY_asn1_find_str.argtypes = (c_void_p, c_char_p, c_int)
+libcrypto.EVP_PKEY_asn1_get0_info.restype = c_int
+libcrypto.EVP_PKEY_asn1_get0_info.argtypes = (POINTER(c_int), POINTER(c_int),
+ POINTER(c_int), POINTER(c_char_p),
+ POINTER(c_char_p), c_void_p)
+libcrypto.EVP_PKEY_cmp.restype = c_int
+libcrypto.EVP_PKEY_cmp.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_PKEY_CTX_ctrl_str.restype = c_int
+libcrypto.EVP_PKEY_CTX_ctrl_str.argtypes = (c_void_p, c_void_p, c_void_p)
+libcrypto.EVP_PKEY_CTX_ctrl.restype = c_int
+libcrypto.EVP_PKEY_CTX_ctrl.argtypes = (c_void_p, c_int, c_int, c_int, c_int,
+ c_void_p)
+libcrypto.EVP_PKEY_CTX_free.argtypes = (c_void_p, )
+libcrypto.EVP_PKEY_CTX_new.restype = c_void_p
+libcrypto.EVP_PKEY_CTX_new.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_PKEY_CTX_new_id.restype = c_void_p
+libcrypto.EVP_PKEY_CTX_new_id.argtypes = (c_int, c_void_p)
+libcrypto.EVP_PKEY_derive.restype = c_int
+libcrypto.EVP_PKEY_derive.argtypes = (c_void_p, c_char_p, POINTER(c_long))
+libcrypto.EVP_PKEY_derive_init.restype = c_int
+libcrypto.EVP_PKEY_derive_init.argtypes = (c_void_p, )
+libcrypto.EVP_PKEY_derive_set_peer.restype = c_int
+libcrypto.EVP_PKEY_derive_set_peer.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_PKEY_free.argtypes = (c_void_p,)
+libcrypto.EVP_PKEY_keygen.restype = c_int
+libcrypto.EVP_PKEY_keygen.argtypes = (c_void_p, c_void_p)
+libcrypto.EVP_PKEY_keygen_init.restype = c_int
+libcrypto.EVP_PKEY_keygen_init.argtypes = (c_void_p, )
+libcrypto.EVP_PKEY_sign.restype = c_int
+libcrypto.EVP_PKEY_sign.argtypes = (c_void_p, c_char_p, POINTER(c_long),
+ c_char_p, c_long)
+libcrypto.EVP_PKEY_sign_init.restype = c_int
+libcrypto.EVP_PKEY_sign_init.argtypes = (c_void_p, )
+libcrypto.EVP_PKEY_verify.restype = c_int
+libcrypto.EVP_PKEY_verify.argtypes = (c_void_p, c_char_p, c_long, c_char_p,
+ c_long)
+libcrypto.EVP_PKEY_verify_init.restype = c_int
+libcrypto.EVP_PKEY_verify_init.argtypes = (c_void_p, )
+libcrypto.PEM_write_bio_PrivateKey.argtypes = (c_void_p, c_void_p, c_void_p,
+ c_char_p, c_int,
+ PW_CALLBACK_FUNC, c_char_p)
+libcrypto.PEM_write_bio_PUBKEY.argtypes = (c_void_p, c_void_p)
+libcrypto.i2d_PUBKEY_bio.argtypes = (c_void_p, c_void_p)
+libcrypto.i2d_PKCS8PrivateKey_bio.argtypes = (c_void_p, c_void_p, c_void_p,
+ c_char_p, c_int,
+ PW_CALLBACK_FUNC, c_char_p)
+libcrypto.ENGINE_finish.argtypes = (c_void_p, )
from ctypescrypto import libcrypto
from ctypescrypto.exception import LibCryptoError
-__all__ = ['RandError','bytes','pseudo_bytes','seed','status']
+__all__ = ['RandError', 'bytes', 'pseudo_bytes', 'seed', 'status']
class RandError(LibCryptoError):
+ """ Exception raised when openssl function return error """
pass
-def bytes( num, check_result=False):
+def bytes(num, check_result=False):
"""
- Returns num bytes of cryptographically strong pseudo-random
- bytes. If checkc_result is True, raises error if PRNG is not
- seeded enough
+ Returns num bytes of cryptographically strong pseudo-random
+ bytes. If checkc_result is True, raises error if PRNG is not
+ seeded enough
"""
- if num <= 0 :
+ if num <= 0:
raise ValueError("'num' should be > 0")
- buffer = create_string_buffer(num)
- result = libcrypto.RAND_bytes(buffer, num)
+ buf = create_string_buffer(num)
+ result = libcrypto.RAND_bytes(buf, num)
if check_result and result == 0:
raise RandError("Random Number Generator not seeded sufficiently")
- return buffer.raw[:num]
+ return buf.raw[:num]
def pseudo_bytes(num):
"""
- Returns num bytes of pseudo random data. Pseudo- random byte
- sequences generated by pseudo_bytes() will be unique if
- they are of sufficient length, but are not necessarily
- unpredictable. They can be used for non-cryptographic purposes
- and for certain purposes in cryptographic protocols, but usually
- not for key generation etc.
+ Returns num bytes of pseudo random data. Pseudo- random byte
+ sequences generated by pseudo_bytes() will be unique if
+ they are of sufficient length, but are not necessarily
+ unpredictable. They can be used for non-cryptographic purposes
+ and for certain purposes in cryptographic protocols, but usually
+ not for key generation etc.
"""
- if num <= 0 :
+ if num <= 0:
raise ValueError("'num' should be > 0")
- buffer = create_string_buffer(num)
- libcrypto.RAND_pseudo_bytes(buffer, num)
- return buffer.raw[:num]
+ buf = create_string_buffer(num)
+ libcrypto.RAND_pseudo_bytes(buf, num)
+ return buf.raw[:num]
def seed(data, entropy=None):
"""
If entropy is not None, it should be floating point(double)
value estimating amount of entropy in the data (in bytes).
"""
- if not isinstance(data,str):
+ if not isinstance(data, str):
raise TypeError("A string is expected")
ptr = c_char_p(data)
size = len(data)
if entropy is None:
libcrypto.RAND_seed(ptr, size)
- else :
+ else:
libcrypto.RAND_add(ptr, size, entropy)
def status():
"""
- Returns 1 if random generator is sufficiently seeded and 0
- otherwise
+ Returns 1 if random generator is sufficiently seeded and 0
+ otherwise
"""
return libcrypto.RAND_status()
-
-libcrypto.RAND_add.argtypes=(c_char_p,c_int,c_double)
-libcrypto.RAND_seed.argtypes=(c_char_p,c_int)
-libcrypto.RAND_pseudo_bytes.argtypes=(c_char_p,c_int)
-libcrypto.RAND_bytes.argtypes=(c_char_p,c_int)
+
+libcrypto.RAND_add.argtypes = (c_char_p, c_int, c_double)
+libcrypto.RAND_seed.argtypes = (c_char_p, c_int)
+libcrypto.RAND_pseudo_bytes.argtypes = (c_char_p, c_int)
+libcrypto.RAND_bytes.argtypes = (c_char_p, c_int)
"""
-Implements interface to openssl X509 and X509Store structures,
+Implements interface to openssl X509 and X509Store structures,
I.e allows to load, analyze and verify certificates.
X509Store objects are also used to verify other signed documets,
-from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
+from ctypes import c_void_p, c_long, c_int, POINTER, c_char_p, Structure, cast
from ctypescrypto.bio import Membio
from ctypescrypto.pkey import PKey
from ctypescrypto.oid import Oid
from ctypescrypto.exception import LibCryptoError
from ctypescrypto import libcrypto
from datetime import datetime
+
try:
from pytz import utc
except ImportError:
- from datetime import timedelta,tzinfo
- ZERO=timedelta(0)
+ from datetime import timedelta, tzinfo
+ ZERO = timedelta(0)
class UTC(tzinfo):
- """tzinfo object for UTC.
+ """tzinfo object for UTC.
If no pytz is available, we would use it.
"""
-
def utcoffset(self, dt):
return ZERO
def dst(self, dt):
return ZERO
- utc=UTC()
+ utc = UTC()
-__all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
+__all__ = ['X509', 'X509Error', 'X509Name', 'X509Store', 'StackOfX509']
class _validity(Structure):
- """ ctypes representation of X509_VAL structure
+ """ ctypes representation of X509_VAL structure
needed to access certificate validity period, because openssl
doesn't provide fuctions for it - only macros
"""
- _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
+ _fields_ = [('notBefore', c_void_p), ('notAfter', c_void_p)]
class _cinf(Structure):
- """ ctypes representtion of X509_CINF structure
+ """ ctypes representtion of X509_CINF structure
neede to access certificate data, which are accessable only
via macros
"""
- _fields_ = [('version',c_void_p),
- ('serialNumber',c_void_p),
- ('sign_alg',c_void_p),
- ('issuer',c_void_p),
- ('validity',POINTER(_validity)),
- ('subject',c_void_p),
- ('pubkey',c_void_p),
- ('issuerUID',c_void_p),
- ('subjectUID',c_void_p),
- ('extensions',c_void_p),
- ]
+ _fields_ = [('version', c_void_p),
+ ('serialNumber', c_void_p),
+ ('sign_alg', c_void_p),
+ ('issuer', c_void_p),
+ ('validity', POINTER(_validity)),
+ ('subject', c_void_p),
+ ('pubkey', c_void_p),
+ ('issuerUID', c_void_p),
+ ('subjectUID', c_void_p),
+ ('extensions', c_void_p),
+ ]
class _x509(Structure):
"""
to access certificate data which are accesable only via
macros, not functions
"""
- _fields_ = [('cert_info',POINTER(_cinf)),
- ('sig_alg',c_void_p),
- ('signature',c_void_p),
+ _fields_ = [('cert_info', POINTER(_cinf)),
+ ('sig_alg', c_void_p),
+ ('signature', c_void_p),
# There are a lot of parsed extension fields there
- ]
+ ]
_px509 = POINTER(_x509)
class X509Error(LibCryptoError):
class X509Name(object):
"""
- Class which represents X.509 distinguished name - typically
+ Class which represents X.509 distinguished name - typically
a certificate subject name or an issuer name.
Now used only to represent information, extracted from the
certificate signing request
"""
# XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
- PRINT_FLAG=0x10010
- ESC_MSB=4
- def __init__(self,ptr=None,copy=False):
+ PRINT_FLAG = 0x10010
+ ESC_MSB = 4
+ def __init__(self, ptr=None, copy=False):
"""
Creates a X509Name object
- @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
- @param copy - indicates that this structure have to be freed upon object destruction
+ @param ptr - pointer to X509_NAME C structure (as returned by some
+ OpenSSL functions
+ @param copy - indicates that this structure have to be freed upon
+ object destruction
"""
if ptr is not None:
- self.ptr=ptr
- self.need_free=copy
- self.writable=False
+ self.ptr = ptr
+ self.need_free = copy
+ self.writable = False
else:
- self.ptr=libcrypto.X509_NAME_new()
- self.need_free=True
- self.writable=True
+ self.ptr = libcrypto.X509_NAME_new()
+ self.need_free = True
+ self.writable = True
+
def __del__(self):
"""
Frees if neccessary
libcrypto.X509_NAME_free(self.ptr)
def __str__(self):
"""
- Produces an ascii representation of the name, escaping all symbols > 0x80
- Probably it is not what you want, unless your native language is English
+ Produces an ascii representation of the name, escaping all
+ symbols > 0x80. Probably it is not what you want, unless
+ your native language is English
"""
- b=Membio()
- libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
- return str(b)
+ bio = Membio()
+ libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0,
+ self.PRINT_FLAG | self.ESC_MSB)
+ return str(bio)
+
def __unicode__(self):
"""
- Produces unicode representation of the name.
+ Produces unicode representation of the name.
"""
- b=Membio()
- libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
- return unicode(b)
+ bio = Membio()
+ libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, self.PRINT_FLAG)
+ return unicode(bio)
def __len__(self):
"""
return number of components in the name
"""
return libcrypto.X509_NAME_entry_count(self.ptr)
- def __cmp__(self,other):
+ def __cmp__(self, other):
"""
Compares X509 names
"""
- return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
- def __eq__(self,other):
- return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
+ return libcrypto.X509_NAME_cmp(self.ptr, other.ptr)
+ def __eq__(self, other):
+ return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) == 0
- def __getitem__(self,key):
- if isinstance(key,Oid):
+ def __getitem__(self, key):
+ if isinstance(key, Oid):
# Return first matching field
- idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
- if idx<0:
- raise KeyError("Key not found "+str(Oid))
- entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
- s=libcrypto.X509_NAME_ENTRY_get_data(entry)
- b=Membio()
- libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
- return unicode(b)
- elif isinstance(key,(int,long)):
+ idx = libcrypto.X509_NAME_get_index_by_NID(self.ptr, key.nid, -1)
+ if idx < 0:
+ raise KeyError("Key not found " + str(Oid))
+ entry = libcrypto.X509_NAME_get_entry(self.ptr, idx)
+ value = libcrypto.X509_NAME_ENTRY_get_data(entry)
+ bio = Membio()
+ libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
+ return unicode(bio)
+ elif isinstance(key, (int, long)):
# Return OID, string tuple
- entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
+ entry = libcrypto.X509_NAME_get_entry(self.ptr, key)
if entry is None:
raise IndexError("name entry index out of range")
- oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
- s=libcrypto.X509_NAME_ENTRY_get_data(entry)
- b=Membio()
- libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
- return (oid,unicode(b))
+ oid = Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
+ value = libcrypto.X509_NAME_ENTRY_get_data(entry)
+ bio = Membio()
+ libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
+ return (oid, unicode(bio))
else:
raise TypeError("X509 NAME can be indexed by Oids or integers only")
- def __setitem__(self,key,val):
+ def __setitem__(self, key, val):
if not self.writable:
raise ValueError("Attempt to modify constant X509 object")
else:
raise NotImplementedError
- def __delitem__(self,key):
+ def __delitem__(self, key):
if not self.writable:
raise ValueError("Attempt to modify constant X509 object")
else:
class _x509_ext(Structure):
""" Represens C structure X509_EXTENSION """
- _fields_=[("object",c_void_p),
- ("critical",c_int),
- ("value",c_void_p)]
+ _fields_ = [("object", c_void_p),
+ ("critical", c_int),
+ ("value", c_void_p)
+ ]
class X509_EXT(object):
""" Python object which represents a certificate extension """
- def __init__(self,ptr,copy=False):
+ def __init__(self, ptr, copy=False):
""" Initializes from the pointer to X509_EXTENSION.
If copy is True, creates a copy, otherwise just
stores pointer.
"""
if copy:
- self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
+ self.ptr = libcrypto.X509_EXTENSION_dup(ptr)
else:
- self.ptr=cast(ptr,POINTER(_x509_ext))
+ self.ptr = cast(ptr, POINTER(_x509_ext))
def __del__(self):
libcrypto.X509_EXTENSION_free(self.ptr)
def __str__(self):
- b=Membio()
- libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
- return str(b)
+ bio = Membio()
+ libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
+ return str(bio)
def __unicode__(self):
- b=Membio()
- libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
- return unicode(b)
+ bio = Membio()
+ libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
+ return unicode(bio)
@property
def oid(self):
+ "Returns OID of the extension"
return Oid.fromobj(self.ptr[0].object)
@property
- def critical(self):
- return self.ptr[0].critical >0
-class _X509extlist(object):
+ def critical(self):
+ "Returns True if extensin have critical flag set"
+ return self.ptr[0].critical > 0
+
+class _X509extlist(object):
"""
- Represents list of certificate extensions
+ Represents list of certificate extensions. Really it keeps
+ reference to certificate object
"""
- def __init__(self,cert):
- self.cert=cert
+ def __init__(self, cert):
+ """
+ Initialize from X509 object
+ """
+ self.cert = cert
+
def __len__(self):
+ """
+ Returns number of extensions
+ """
return libcrypto.X509_get_ext_count(self.cert.cert)
- def __getitem__(self,item):
- p=libcrypto.X509_get_ext(self.cert.cert,item)
- if p is None:
+
+ def __getitem__(self, item):
+ """
+ Returns extension by index, creating a copy
+ """
+ ext_ptr = libcrypto.X509_get_ext(self.cert.cert, item)
+ if ext_ptr is None:
raise IndexError
- return X509_EXT(p,True)
- def find(self,oid):
+ return X509_EXT(ext_ptr, True)
+ def find(self, oid):
"""
Return list of extensions with given Oid
"""
- if not isinstance(oid,Oid):
+ if not isinstance(oid, Oid):
raise TypeError("Need crytypescrypto.oid.Oid as argument")
- found=[]
- l=-1
- end=len(self)
+ found = []
+ index = -1
+ end = len(self)
while True:
- l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
- if l>=end or l<0:
- break
- found.append(self[l])
+ index = libcrypto.X509_get_ext_by_NID(self.cert.cert, oid.nid,
+ index)
+ if index >= end or index < 0:
+ break
+ found.append(self[index])
return found
- def find_critical(self,crit=True):
+
+ def find_critical(self, crit=True):
"""
Return list of critical extensions (or list of non-cricital, if
optional second argument is False
"""
if crit:
- flag=1
+ flag = 1
else:
- flag=0
- found=[]
- end=len(self)
- l=-1
+ flag = 0
+ found = []
+ end = len(self)
+ index = -1
while True:
- l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
- if l>=end or l<0:
- break
- found.append(self[l])
- return found
+ index = libcrypto.X509_get_ext_by_critical(self.cert.cert, flag,
+ index)
+ if index >= end or index < 0:
+ break
+ found.append(self[index])
+ return found
+
+def _X509__asn1date_to_datetime(asn1date):
+ """
+ Converts openssl ASN1_TIME object to python datetime.datetime
+ """
+ bio = Membio()
+ libcrypto.ASN1_TIME_print(bio.bio, asn1date)
+ pydate = datetime.strptime(str(bio), "%b %d %H:%M:%S %Y %Z")
+ return pydate.replace(tzinfo=utc)
class X509(object):
"""
- Represents X.509 certificate.
+ Represents X.509 certificate.
"""
- def __init__(self,data=None,ptr=None,format="PEM"):
+ def __init__(self, data=None, ptr=None, format="PEM"):
"""
Initializes certificate
@param data - serialized certificate in PEM or DER format.
- @param ptr - pointer to X509, returned by some openssl function.
+ @param ptr - pointer to X509, returned by some openssl function.
mutually exclusive with data
@param format - specifies data format. "PEM" or "DER", default PEM
"""
if ptr is not None:
- if data is not None:
+ if data is not None:
raise TypeError("Cannot use data and ptr simultaneously")
self.cert = ptr
elif data is None:
raise TypeError("data argument is required")
else:
- b=Membio(data)
+ bio = Membio(data)
if format == "PEM":
- self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
+ self.cert = libcrypto.PEM_read_bio_X509(bio.bio, None, None,
+ None)
else:
- self.cert=libcrypto.d2i_X509_bio(b.bio,None)
+ self.cert = libcrypto.d2i_X509_bio(bio.bio, None)
if self.cert is None:
raise X509Error("error reading certificate")
- self.extensions=_X509extlist(self)
+ self.extensions = _X509extlist(self)
def __del__(self):
"""
Frees certificate object
libcrypto.X509_free(self.cert)
def __str__(self):
""" Returns der string of the certificate """
- b=Membio()
- if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
+ bio = Membio()
+ if libcrypto.i2d_X509_bio(bio.bio, self.cert) == 0:
raise X509Error("error serializing certificate")
- return str(b)
+ return str(bio)
def __repr__(self):
""" Returns valid call to the constructor """
- return "X509(data="+repr(str(self))+",format='DER')"
+ return "X509(data=" + repr(str(self)) + ",format='DER')"
@property
def pubkey(self):
"""EVP PKEy object of certificate public key"""
- return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
+ return PKey(ptr=libcrypto.X509_get_pubkey(self.cert, False))
def pem(self):
""" Returns PEM represntation of the certificate """
- b=Membio()
- if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
+ bio = Membio()
+ if libcrypto.PEM_write_bio_X509(bio.bio, self.cert) == 0:
raise X509Error("error serializing certificate")
- return str(b)
- def verify(self,store=None,chain=[],key=None):
- """
- Verify self. Supports verification on both X509 store object
+ return str(bio)
+ def verify(self, store=None, chain=None, key=None):
+ """
+ Verify self. Supports verification on both X509 store object
or just public issuer key
@param store X509Store object.
@param chain - list of X509 objects to add into verification
context.These objects are untrusted, but can be used to
build certificate chain up to trusted object in the store
@param key - PKey object with open key to validate signature
-
- parameters store and key are mutually exclusive. If neither
+
+ parameters store and key are mutually exclusive. If neither
is specified, attempts to verify self as self-signed certificate
"""
if store is not None and key is not None:
raise X509Error("key and store cannot be specified simultaneously")
if store is not None:
- ctx=libcrypto.X509_STORE_CTX_new()
+ ctx = libcrypto.X509_STORE_CTX_new()
if ctx is None:
raise X509Error("Error allocating X509_STORE_CTX")
- if chain is not None and len(chain)>0:
- ch=StackOfX509(chain)
+ if chain is not None and len(chain) > 0:
+ chain_ptr = StackOfX509(chain).ptr
else:
- ch=None
- if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
+ chain_ptr = None
+ if libcrypto.X509_STORE_CTX_init(ctx, store.store, self.cert,
+ chain_ptr) < 0:
raise X509Error("Error allocating X509_STORE_CTX")
- res= libcrypto.X509_verify_cert(ctx)
+ res = libcrypto.X509_verify_cert(ctx)
libcrypto.X509_STORE_CTX_free(ctx)
- return res>0
+ return res > 0
else:
if key is None:
if self.issuer != self.subject:
# Not a self-signed certificate
return False
key = self.pubkey
- res = libcrypto.X509_verify(self.cert,key.key)
+ res = libcrypto.X509_verify(self.cert, key.key)
if res < 0:
raise X509Error("X509_verify failed")
- return res>0
-
+ return res > 0
+
@property
def subject(self):
""" X509Name for certificate subject name """
@property
def serial(self):
""" Serial number of certificate as integer """
- asnint=libcrypto.X509_get_serialNumber(self.cert)
- b=Membio()
- libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
- return int(str(b),16)
- @property
+ asnint = libcrypto.X509_get_serialNumber(self.cert)
+ bio = Membio()
+ libcrypto.i2a_ASN1_INTEGER(bio.bio, asnint)
+ return int(str(bio), 16)
+ @property
def version(self):
- """ certificate version as integer. Really certificate stores 0 for
- version 1 and 2 for version 3, but we return 1 and 3 """
- asn1int=cast(self.cert,_px509)[0].cert_info[0].version
- return libcrypto.ASN1_INTEGER_get(asn1int)+1
+ """
+ certificate version as integer. Really certificate stores 0 for
+ version 1 and 2 for version 3, but we return 1 and 3
+ """
+ asn1int = cast(self.cert, _px509)[0].cert_info[0].version
+ return libcrypto.ASN1_INTEGER_get(asn1int) + 1
@property
def startDate(self):
""" Certificate validity period start date """
- # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
- global utc
- asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
- b=Membio()
- libcrypto.ASN1_TIME_print(b.bio,asn1date)
- return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
+ # Need deep poke into certificate structure
+ # (x)->cert_info->validity->notBefore
+ asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notBefore
+ return __asn1date_to_datetime(asn1)
@property
def endDate(self):
""" Certificate validity period end date """
- # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
- global utc
- asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
- b=Membio()
- libcrypto.ASN1_TIME_print(b.bio,asn1date)
- return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
+ # Need deep poke into certificate structure
+ # (x)->cert_info->validity->notAfter
+ asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notAfter
+ return __asn1date_to_datetime(asn1)
def check_ca(self):
""" Returns True if certificate is CA certificate """
- return libcrypto.X509_check_ca(self.cert)>0
+ return libcrypto.X509_check_ca(self.cert) > 0
+
class X509Store(object):
"""
- Represents trusted certificate store. Can be used to lookup CA
- certificates to verify
+ Represents trusted certificate store. Can be used to lookup CA
+ certificates to verify
- @param file - file with several certificates and crls
- to load into store
- @param dir - hashed directory with certificates and crls
- @param default - if true, default verify location (directory)
- is installed
+ @param file - file with several certificates and crls
+ to load into store
+ @param dir - hashed directory with certificates and crls
+ @param default - if true, default verify location (directory)
+ is installed
"""
- def __init__(self,file=None,dir=None,default=False):
+ def __init__(self, file=None, dir=None, default=False):
"""
- Creates X509 store and installs lookup method. Optionally initializes
+ Creates X509 store and installs lookup method. Optionally initializes
by certificates from given file or directory.
"""
#
# Todo - set verification flags
- #
- self.store=libcrypto.X509_STORE_new()
+ #
+ self.store = libcrypto.X509_STORE_new()
if self.store is None:
raise X509Error("allocating store")
- lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
+ lookup = libcrypto.X509_STORE_add_lookup(self.store,
+ libcrypto.X509_LOOKUP_file())
if lookup is None:
raise X509Error("error installing file lookup method")
- if (file is not None):
- if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
+ if file is not None:
+ if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, file, 1, None) > 0:
raise X509Error("error loading trusted certs from file "+file)
- lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
+ lookup = libcrypto.X509_STORE_add_lookup(self.store,
+ libcrypto.X509_LOOKUP_hash_dir())
if lookup is None:
raise X509Error("error installing hashed lookup method")
if dir is not None:
- if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
+ if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dir, 1, None) > 0:
raise X509Error("error adding hashed trusted certs dir "+dir)
if default:
- if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
+ if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, None, 3, None) > 0:
raise X509Error("error adding default trusted certs dir ")
- def add_cert(self,cert):
+ def add_cert(self, cert):
"""
Explicitely adds certificate to set of trusted in the store
@param cert - X509 object to add
"""
- if not isinstance(cert,X509):
+ if not isinstance(cert, X509):
raise TypeError("cert should be X509")
- libcrypto.X509_STORE_add_cert(self.store,cert.cert)
- def add_callback(self,callback):
+ libcrypto.X509_STORE_add_cert(self.store, cert.cert)
+ def add_callback(self, callback):
"""
Installs callback function, which would receive detailed information
about verified ceritificates
"""
raise NotImplementedError
- def setflags(self,flags):
+ def setflags(self, flags):
"""
Set certificate verification flags.
@param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
"""
- libcrypto.X509_STORE_set_flags(self.store,flags)
- def setpurpose(self,purpose):
+ libcrypto.X509_STORE_set_flags(self.store, flags)
+ def setpurpose(self, purpose):
"""
Sets certificate purpose which verified certificate should match
- @param purpose - number from 1 to 9 or standard strind defined in Openssl
- possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
- """
- if isinstance(purpose,str):
- purp_no=X509_PURPOSE_get_by_sname(purpose)
- if purp_no <=0:
- raise X509Error("Invalid certificate purpose '"+purpose+"'")
- elif isinstance(purpose,int):
+ @param purpose - number from 1 to 9 or standard strind defined
+ in Openssl
+ possible strings - sslcient,sslserver, nssslserver, smimesign,i
+ smimeencrypt, crlsign, any, ocsphelper
+ """
+ if isinstance(purpose, str):
+ purp_no = libcrypto.X509_PURPOSE_get_by_sname(purpose)
+ if purp_no <= 0:
+ raise X509Error("Invalid certificate purpose '%s'" % purpose)
+ elif isinstance(purpose, int):
purp_no = purpose
- if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
+ if libcrypto.X509_STORE_set_purpose(self.store, purp_no) <= 0:
raise X509Error("cannot set purpose")
- def setdepth(self,depth):
+ def setdepth(self, depth):
"""
Sets the verification depth i.e. max length of certificate chain
which is acceptable
"""
- libcrypto.X509_STORE_set_depth(self.store,depth)
+ libcrypto.X509_STORE_set_depth(self.store, depth)
def settime(self, time):
"""
Set point in time used to check validity of certificates for
Time can be either python datetime object or number of seconds
sinse epoch
"""
- if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
- d=int(time.strftime("%s"))
- elif isinstance(time,int):
- pass
+ if isinstance(time, datetime) or isinstance(time,
+ datetime.date):
+ seconds = int(time.strftime("%s"))
+ elif isinstance(time, int):
+ seconds = time
else:
- raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
+ raise TypeError("datetime.date, datetime.datetime or integer " +
+ "is required as time argument")
raise NotImplementedError
class StackOfX509(object):
"""
Implements OpenSSL STACK_OF(X509) object.
It looks much like python container types
"""
- def __init__(self,certs=None,ptr=None,disposable=True):
+ def __init__(self, certs=None, ptr=None, disposable=True):
"""
Create stack
@param certs - list of X509 objects. If specified, read-write
"""
if ptr is None:
self.need_free = True
- self.ptr=libcrypto.sk_new_null()
+ self.ptr = libcrypto.sk_new_null()
if certs is not None:
for crt in certs:
self.append(crt)
elif certs is not None:
- raise ValueError("cannot handle certs an ptr simultaneously")
+ raise ValueError("cannot handle certs an ptr simultaneously")
else:
self.need_free = disposable
- self.ptr=ptr
+ self.ptr = ptr
def __len__(self):
return libcrypto.sk_num(self.ptr)
- def __getitem__(self,index):
- if index <0 or index>=len(self):
+ def __getitem__(self, index):
+ if index < 0 or index >= len(self):
raise IndexError
- p=libcrypto.sk_value(self.ptr,index)
+ p = libcrypto.sk_value(self.ptr, index)
return X509(ptr=libcrypto.X509_dup(p))
- def __setitem__(self,index,value):
+ def __setitem__(self, index, value):
if not self.need_free:
raise ValueError("Stack is read-only")
- if index <0 or index>=len(self):
+ if index < 0 or index >= len(self):
raise IndexError
- if not isinstance(value,X509):
+ if not isinstance(value, X509):
raise TypeError('StackOfX508 can contain only X509 objects')
- p=libcrypto.sk_value(self.ptr,index)
- libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+ p = libcrypto.sk_value(self.ptr, index)
+ libcrypto.sk_set(self.ptr, index, libcrypto.X509_dup(value.cert))
libcrypto.X509_free(p)
- def __delitem__(self,index):
+ def __delitem__(self, index):
if not self.need_free:
raise ValueError("Stack is read-only")
- if index <0 or index>=len(self):
+ if index < 0 or index >= len(self):
raise IndexError
- p=libcrypto.sk_delete(self.ptr,index)
+ p = libcrypto.sk_delete(self.ptr, index)
libcrypto.X509_free(p)
def __del__(self):
if self.need_free:
- libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
- def append(self,value):
+ libcrypto.sk_pop_free(self.ptr, libcrypto.X509_free)
+ def append(self, value):
+ """ Adds certificate to stack """
if not self.need_free:
raise ValueError("Stack is read-only")
- if not isinstance(value,X509):
+ if not isinstance(value, X509):
raise TypeError('StackOfX508 can contain only X509 objects')
- libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
-libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
-libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
-libcrypto.PEM_read_bio_X509.restype=c_void_p
-libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
-libcrypto.PEM_write_bio_X509.restype=c_int
-libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
-libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
-libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
-libcrypto.ASN1_INTEGER_get.restype=c_long
-libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
-libcrypto.X509_get_serialNumber.restype=c_void_p
-libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
-libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
-libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
-libcrypto.X509_NAME_get_entry.restype=c_void_p
-libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
-libcrypto.X509_STORE_new.restype=c_void_p
-libcrypto.X509_STORE_add_lookup.restype=c_void_p
-libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
-libcrypto.X509_LOOKUP_file.restype=c_void_p
-libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
-libcrypto.X509_LOOKUP_ctrl.restype=c_int
-libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
-libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
-libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
-libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
-libcrypto.X509_get_ext.restype=c_void_p
-libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
-libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
-libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
-libcrypto.sk_set.restype=c_void_p
-libcrypto.sk_value.argtypes=(c_void_p,c_int)
-libcrypto.sk_value.restype=c_void_p
-libcrypto.X509_dup.restype=c_void_p
-libcrypto.sk_new_null.restype=c_void_p
-libcrypto.X509_dup.argtypes=(c_void_p,)
-libcrypto.X509_NAME_hash.restype=c_long
-libcrypto.X509_NAME_hash.argtypes=(c_void_p,)
+ libcrypto.sk_push(self.ptr, libcrypto.X509_dup(value.cert))
+
+libcrypto.i2a_ASN1_INTEGER.argtypes = (c_void_p, c_void_p)
+libcrypto.ASN1_STRING_print_ex.argtypes = (c_void_p, c_void_p, c_long)
+libcrypto.PEM_read_bio_X509.restype = c_void_p
+libcrypto.PEM_read_bio_X509.argtypes = (c_void_p, POINTER(c_void_p),
+ c_void_p, c_void_p)
+libcrypto.PEM_write_bio_X509.restype = c_int
+libcrypto.PEM_write_bio_X509.argtypes = (c_void_p, c_void_p)
+libcrypto.ASN1_TIME_print.argtypes = (c_void_p, c_void_p)
+libcrypto.ASN1_INTEGER_get.argtypes = (c_void_p, )
+libcrypto.ASN1_INTEGER_get.restype = c_long
+libcrypto.X509_get_serialNumber.argtypes = (c_void_p, )
+libcrypto.X509_get_serialNumber.restype = c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.restype = c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.argtypes = (c_void_p, )
+libcrypto.OBJ_obj2nid.argtypes = (c_void_p, )
+libcrypto.X509_NAME_get_entry.restype = c_void_p
+libcrypto.X509_NAME_get_entry.argtypes = (c_void_p, c_int)
+libcrypto.X509_STORE_new.restype = c_void_p
+libcrypto.X509_STORE_add_lookup.restype = c_void_p
+libcrypto.X509_STORE_add_lookup.argtypes = (c_void_p, c_void_p)
+libcrypto.X509_LOOKUP_file.restype = c_void_p
+libcrypto.X509_LOOKUP_hash_dir.restype = c_void_p
+libcrypto.X509_LOOKUP_ctrl.restype = c_int
+libcrypto.X509_LOOKUP_ctrl.argtypes = (c_void_p, c_int, c_char_p, c_long,
+ POINTER(c_char_p))
+libcrypto.X509_EXTENSION_dup.argtypes = (c_void_p, )
+libcrypto.X509_EXTENSION_dup.restype = POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
+ c_int)
+libcrypto.X509_get_ext.restype = c_void_p
+libcrypto.X509_get_ext.argtypes = (c_void_p, c_int)
+libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
+ c_int)
+libcrypto.sk_set.argtypes = (c_void_p, c_int, c_void_p)
+libcrypto.sk_set.restype = c_void_p
+libcrypto.sk_value.argtypes = (c_void_p, c_int)
+libcrypto.sk_value.restype = c_void_p
+libcrypto.X509_dup.restype = c_void_p
+libcrypto.sk_new_null.restype = c_void_p
+libcrypto.X509_dup.argtypes = (c_void_p, )
+libcrypto.X509_NAME_hash.restype = c_long
+libcrypto.X509_NAME_hash.argtypes = (c_void_p, )
setup(
name="ctypescrypto",
- version="0.2.7",
+ version="0.3.0",
description="CTypes-based interface for some OpenSSL libcrypto features",
author="Victor Wagner",
author_email="vitus@wagner.pp.ru",
class TestDigestType(unittest.TestCase):
def test_md4(self):
d=digest.DigestType("md4")
- self.assertEqual(d.digest_size(),16)
- self.assertEqual(d.block_size(),64)
- self.assertEqual(d.oid(),Oid("md4"))
+ self.assertEqual(d.digest_size,16)
+ self.assertEqual(d.block_size,64)
+ self.assertEqual(d.oid,Oid("md4"))
self.assertEqual(d.name,'md4')
def test_md5(self):
d=digest.DigestType("md5")
- self.assertEqual(d.digest_size(),16)
- self.assertEqual(d.block_size(),64)
- self.assertEqual(d.oid(),Oid("md5"))
+ self.assertEqual(d.digest_size,16)
+ self.assertEqual(d.block_size,64)
+ self.assertEqual(d.oid,Oid("md5"))
self.assertEqual(d.name,'md5')
def test_sha1(self):
d=digest.DigestType("sha1")
- self.assertEqual(d.digest_size(),20)
- self.assertEqual(d.block_size(),64)
- self.assertEqual(d.oid(),Oid("sha1"))
+ self.assertEqual(d.digest_size,20)
+ self.assertEqual(d.block_size,64)
+ self.assertEqual(d.oid,Oid("sha1"))
self.assertEqual(d.name,'sha1')
def test_sha256(self):
d=digest.DigestType("sha256")
- self.assertEqual(d.digest_size(),32)
- self.assertEqual(d.block_size(),64)
- self.assertEqual(d.oid(),Oid("sha256"))
+ self.assertEqual(d.digest_size,32)
+ self.assertEqual(d.block_size,64)
+ self.assertEqual(d.oid,Oid("sha256"))
self.assertEqual(d.name,'sha256')
def test_sha384(self):
d=digest.DigestType("sha384")
- self.assertEqual(d.digest_size(),48)
- self.assertEqual(d.block_size(),128)
- self.assertEqual(d.oid(),Oid("sha384"))
+ self.assertEqual(d.digest_size,48)
+ self.assertEqual(d.block_size,128)
+ self.assertEqual(d.oid,Oid("sha384"))
self.assertEqual(d.name,'sha384')
def test_sha512(self):
d=digest.DigestType("sha512")
- self.assertEqual(d.digest_size(),64)
- self.assertEqual(d.block_size(),128)
- self.assertEqual(d.oid(),Oid("sha512"))
+ self.assertEqual(d.digest_size,64)
+ self.assertEqual(d.block_size,128)
+ self.assertEqual(d.oid,Oid("sha512"))
self.assertEqual(d.name,'sha512')
def test_createfromoid(self):
oid=Oid('sha256')
d=digest.DigestType(oid)
- self.assertEqual(d.digest_size(),32)
- self.assertEqual(d.block_size(),64)
- self.assertEqual(d.oid(),Oid("sha256"))
+ self.assertEqual(d.digest_size,32)
+ self.assertEqual(d.block_size,64)
+ self.assertEqual(d.oid,Oid("sha256"))
self.assertEqual(d.name,'sha256')
def test_createfromEVP_MD(self):
d1=digest.DigestType("sha256")
with self.assertRaises(AttributeError):
s=d2.name
d2.digest=d1.digest
- self.assertEqual(d2.digest_size(),32)
- self.assertEqual(d2.block_size(),64)
- self.assertEqual(d2.oid(),Oid("sha256"))
+ self.assertEqual(d2.digest_size,32)
+ self.assertEqual(d2.block_size,64)
+ self.assertEqual(d2.oid,Oid("sha256"))
self.assertEqual(d2.name,'sha256')
def test_invalidDigest(self):
with self.assertRaises(digest.DigestError):