============
Python interface to some openssl function based on ctypes module
+
+This module is based on works from
+
+http://code.google.com/p/ctypescrypto/
+
+It is aimed to provide Python interface to OpenSSL libcrypto function
+
+Now supported:
+
+Digests
+Ciphers
+Low-level private key operations (like pkey and pkeyutl command line ops)
+(all via algorithm-agnostic EVP interface).
+Engine loading
+OpenSSL error stack to python exception conversion
+X509 certificates partially
+
--- /dev/null
+"""
+ Interface to some libcrypto functions
+
+"""
+
+from ctypes import CDLL
+
+libcrypto = CDLL("libcrypto.so.1.0.0")
+libcrypto.OPENSSL_config(None)
--- /dev/null
+from ctypescrypto import libcrypto
+from ctypes import c_char_p, c_int, string_at
+class Membio:
+ """
+ Provides interface to OpenSSL memory bios
+ use str() to get contents of writable bio
+ use bio member to pass to libcrypto function
+ """
+ def __init__(self,data=None):
+ """ If data is specified, creates read-only BIO. If data is
+ None, creates writable BIO
+ """
+ if data is None:
+ method=libcrypto.BIO_s_mem()
+ self.bio=libcrypto.BIO_new(method)
+ else:
+ self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data))q
+ def __del__(self):
+ libcrypto.BIO_free(self.bio)
+ del(self.bio)
+ def __str__(self):
+ p=c_char_p(None)
+ l=BIO_get_mem_data(self.bio,byref(p))
+ return string_at(p,l)
+#FIXME TODO - BIO should have stream-like interface
+libcrypto.BIO_s_mem.restype=c_void_p
+libcrypto.BIO_new.restype=c_void_p
+libcrypto.BIO_new.argtypes=(c_void_p,)
+libcrypto.BIO_get_mem_data.restype=c_long
+libcrypto.BIO_get_mem_data.argtypes=(c_void_p,POINTER(c_char_p))
--- /dev/null
+from ctypes import *\r
+\r
+CIPHER_ALGORITHMS = ("DES", "DES-EDE3", "BF", "AES-128", "AES-192", "AES-256")\r
+CIPHER_MODES = ("CBC", "CFB", "OFB", "ECB")\r
+\r
+class CipherError(Exception):\r
+ pass\r
+\r
+class CipherType:\r
+\r
+ def __init__(self, libcrypto, cipher_algo, cipher_mode):\r
+ self.libcrypto = libcrypto\r
+ self.cipher_algo = cipher_algo\r
+ self.cipher_mode = cipher_mode\r
+ cipher_name = "-".join([self.cipher_algo, self.cipher_mode])\r
+ self.cipher = self.libcrypto.EVP_get_cipherbyname(cipher_name)\r
+ if self.cipher == 0:\r
+ raise CipherError, "Unknown cipher: %s" % cipher_name\r
+\r
+ def __del__(self):\r
+ pass\r
+\r
+ def algo(self):\r
+ return self.cipher_algo\r
+\r
+ def mode(self):\r
+ return self.cipher_mode\r
+\r
+class Cipher:\r
+\r
+ def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True):\r
+ self.libcrypto = libcrypto\r
+ self._clean_ctx()\r
+ key_ptr = c_char_p(key)\r
+ iv_ptr = c_char_p(iv)\r
+ self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr)\r
+ if self.ctx == 0:\r
+ raise CipherError, "Unable to create cipher context"\r
+ self.encrypt = encrypt\r
+ if encrypt: \r
+ enc = 1\r
+ else: \r
+ enc = 0\r
+ result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc))\r
+ self.cipher_type = cipher_type\r
+ if result == 0:\r
+ self._clean_ctx()\r
+ raise CipherError, "Unable to initialize cipher"\r
+\r
+ def __del__(self):\r
+ self._clean_ctx()\r
+\r
+ def enable_padding(self, padding=True):\r
+ if padding:\r
+ padding_flag = 1\r
+ else:\r
+ padding_flag = 0\r
+ self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag)\r
+\r
+ def update(self, data):\r
+ if self.cipher_finalized :\r
+ raise CipherError, "No updates allowed"\r
+ if type(data) != type(""):\r
+ raise TypeError, "A string is expected"\r
+ if len(data) <= 0:\r
+ return ""\r
+ self.data = self.data + data\r
+ \r
+ def finish(self, data=None):\r
+ if data is not None:\r
+ self.update(data)\r
+ return self._finish()\r
+ \r
+ def _finish(self):\r
+ if self.cipher_finalized :\r
+ raise CipherError, "Cipher operation is already completed"\r
+ self.cipher_out = create_string_buffer(len(self.data) + 32)\r
+ result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data))\r
+ if result == 0:\r
+ self._clean_ctx()\r
+ raise CipherError, "Unable to update cipher"\r
+ self.cipher_finalized = True\r
+ update_data = self.cipher_out.raw[:self.cipher_out_len.value]\r
+ result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len))\r
+ if result == 0:\r
+ self._clean_ctx()\r
+ raise CipherError, "Unable to finalize cipher"\r
+ final_data = self.cipher_out.raw[:self.cipher_out_len.value]\r
+ return update_data + final_data\r
+ \r
+ def _clean_ctx(self):\r
+ try:\r
+ if self.ctx is not None:\r
+ self.libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx)\r
+ self.libcrypto.EVP_CIPHER_CTX_free(self.ctx)\r
+ del(self.ctx)\r
+ except AttributeError:\r
+ pass\r
+ self.cipher_out = None\r
+ self.cipher_out_len = c_long(0)\r
+ self.data = ""\r
+ self.cipher_finalized = False
\ No newline at end of file
--- /dev/null
+"""\r
+ Implmenets interface to OpenSSL EVP_Digest* functions.\r
+ Interface made as close to hashlib as possible\r
+"""\r
+from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long_long\r
+from ctypescrypto import libcrypto\r
+from ctypescrypto.exception import LibCryptoError\r
+DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512")\r
+\r
+\r
+class DigestError(LibCryptoError):\r
+ pass\r
+\r
+def new(algname):\r
+ md=DigestType(algname)\r
+ return Digest(md)\r
+\r
+class DigestType:\r
+ """\r
+ \r
+ Represents EVP_MD object - constant structure which describes\r
+ digest algorithm\r
+\r
+ """\r
+ def __init__(self, digest_name):\r
+ """\r
+ Finds digest by its name\r
+ """\r
+ self.digest_name = digest_name\r
+ self.digest = libcrypto.EVP_get_digestbyname(self.digest_name)\r
+ if self.digest == 0:\r
+ raise DigestError, "Unknown digest: %s" % self.digest_name\r
+\r
+ def __del__(self):\r
+ pass\r
+ def digest_size(self):\r
+ return libcrypto.EVP_MD_size(self.digest)\r
+ def block_size(self):\r
+ return libcrypto.EVP_MD_block_size(self.digest)\r
+ def oid(self):\r
+ pass\r
+ #FIXME TBD\r
+ # return Oid(nid=libcrypto.EVP_MD_type(self.digest)\r
+class Digest:\r
+ """\r
+ Represents EVP_MD_CTX object which actually used to calculate\r
+ digests.\r
+\r
+ """\r
+ def __init__(self, digest_type):\r
+ """\r
+ Initializes digest using given type.\r
+ """\r
+ self._clean_ctx()\r
+ self.ctx = libcrypto.EVP_MD_CTX_create()\r
+ if self.ctx == 0:\r
+ raise DigestError, "Unable to create digest context"\r
+ result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None)\r
+ if result == 0:\r
+ self._clean_ctx()\r
+ raise DigestError, "Unable to initialize digest"\r
+ self.digest_type = digest_type\r
+ self.digest_size = self.digest_type.digest_size()\r
+ self.block_size = self.digest_type.block_size()\r
+\r
+ def __del__(self):\r
+ self._clean_ctx()\r
+\r
+ def update(self, data):\r
+ """\r
+ Hashes given byte string as data\r
+ """\r
+ if self.digest_finalized:\r
+ raise DigestError, "No updates allowed"\r
+ if type(data) != type(""):\r
+ raise TypeError, "A string is expected"\r
+ result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), len(data))\r
+ if result != 1:\r
+ raise DigestError, "Unable to update digest"\r
+ \r
+ def digest(self, data=None):\r
+ """\r
+ Finalizes digest operation and return digest value\r
+ Optionally hashes data before finalizing\r
+ """\r
+ if self.digest_finalized:\r
+ return self.digest_out.raw[:self.digest_size]\r
+ if data is not None:\r
+ self.update(data)\r
+ self.digest_out = create_string_buffer(256)\r
+ length = c_long(0)\r
+ result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length))\r
+ if result != 1 :\r
+ raise DigestError, "Unable to finalize digest"\r
+ self.digest_finalized = True\r
+ return self.digest_out.raw[:self.digest_size]\r
+ def copy(self):\r
+ """\r
+ Creates copy of the digest CTX to allow to compute digest\r
+ while being able to hash more data\r
+ """\r
+ new_digest=Digest(self.digest_type)\r
+ libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx)\r
+ return new_digest\r
+\r
+ def _clean_ctx(self):\r
+ try:\r
+ if self.ctx is not None:\r
+ libcrypto.EVP_MD_CTX_destroy(self.ctx)\r
+ del(self.ctx)\r
+ except AttributeError:\r
+ pass\r
+ self.digest_out = None\r
+ self.digest_finalized = False\r
+\r
+ def hexdigest(self,data=None):\r
+ """\r
+ Returns digest in the hexadecimal form. For compatibility\r
+ with hashlib\r
+ """\r
+ from base64 import b16encode\r
+ return b16encode(self.digest(data)\r
+\r
+\r
+# Declare function result and argument types\r
+libcrypto.EVP_get_digestbyname.restype = c_void_p\r
+libcrypto.EVP_MD_CTX_create.restype = c_void_p\r
+libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p,c_void_p,c_void_p)\r
+libcrypto.EVP_DigestUpdate.argtypes = (c_void_p,c_char_p,c_longlong)\r
+libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p,c_char_p,POINTER(c_long))\r
+libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p,)\r
+libcrypto.EVP_MD_CTX_copy.argtypes=(c_void_p, c_void_p)\r
+libcrypto.EVP_MD_type.argtypes=(c_void_p,)\r
+libcrypto.EVP_MD_size.argtypes=(c_void_p,)\r
+libcrypto.EVP_MD_block_size.argtypes=(c_void_p,)\r
--- /dev/null
+from ctypes import *
+from ctypescrypto import libcrypto
+from ctypescrypto.exception import LibCryptoError
+default=None
+
+def set_default(engine):
+ global default
+ e=libcrypto.ENGINE_by_id(engine)
+ if e is None:
+ # Try load engine
+ e = libcrypto.ENGINE_by_id("dynamic")
+ if e is None:
+ raise LibCryptoError("Cannot get 'dynamic' engine")
+ if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0):
+ raise LibCryptoError("Cannot execute ctrl cmd SO_PATH")
+ if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0):
+ raise LibCryptoError("Cannot execute ctrl cmd LOAD")
+ if e is None:
+ raise ValueError("Cannot find engine "+engine)
+ libcrypto.ENGINE_set_default(e,c_int(0xFFFF))
+ default=e
+
+# Declare function result and arguments for used functions
+libcrypto.ENGINE_by_id.restype=c_void_p
+libcrypto.ENGINE_by_id.argtypes=(c_char_p,)
+libcrypto.ENGINE_set_default.argtypes=(c_void_p,c_int)
+libcrypto.ENGINE_ctrl_cmd_string.argtypes=(c_void_p,c_char_p,c_char_p,c_int)
--- /dev/null
+from ctypes import *
+from ctypescrypto import libcrypto
+strings_loaded=False
+class LibCryptoError(Exception):
+ """
+ Exception for libcrypto errors. Adds all the info, which can be
+ extracted from internal (per-thread) libcrypto error stack to the message,
+ passed to the constructor.
+ """
+ def __init__(self,msg):
+ global strings_loaded
+ if not strings_loaded:
+ libcrypto.ERR_load_crypto_strings()
+ strings_loaded = True
+ e=libcrypto.ERR_get_error()
+ m = msg
+ while e != 0:
+ m+="\n\t"+libcrypto.ERR_lib_error_string(e)+":"+\
+ libcrypto.ERR_func_error_string(e)+":"+\
+ libcrypto.ERR_reason_error_string(e)
+ e=libcrypto.ERR_get_error()
+ self.args=(m,)
+
+def clear_err_stack():
+ """
+ Clears internal libcrypto err stack. Call it if you've checked
+ return code and processed exceptional situation, so subsequent
+ raising of the LibCryptoError wouldn't list already handled errors
+ """
+ libcrypto.ERR_clear_error()
+
+
+libcrypto.ERR_lib_error_string.restype=c_char_p
+libcrypto.ERR_func_error_string.restype=c_char_p
+libcrypto.ERR_reason_error_string.restype=c_char_p
--- /dev/null
+"""
+ Interface to OpenSSL object identifier database
+"""
+from ctypescrypto import libcrypto
+class Oid:
+ def __init__(self,value):
+ if type(value) == type(""):
+ self.nid=libcrypto.OBJ_txt2nid(value)
+ if self.nid==0:
+ raise LibCryptoError("Cannot find object %s in the
+ database"%(value))
+ elif type(value) == type(0):
+ self.nid=value
+ else:
+ raise TypeError("Cannot convert this type to object identifier")
+ def __cmp__(self,other):
+ return self.nid-other.nid
+ def __str__(self):
+ return self.dotted()
+ def shorttname(self):
+ return libcrypto.OBJ_nid2sn(self.nid)
+ def longname(self):
+ return libcrypto.OBJ_nid2ln(self.nid)
+ def dotted(self)
+ obj=libcrypto.OBJ_nid2obj(self.nid)
+ buf=create_string_buffer(256)
+ libcrypto.OBJ_obj2txt(buf,256,obj,1)
+ return buf.value
+
--- /dev/null
+from ctypes import byref,c_int,c_long, c_longlong, create_string_buffer
+from ctypescrypto import libcrypto
+from ctypescrypto.exception import LibCryptoErrors,clear_err_stack
+from ctypescrypto.bio import Membio
+
+class PKeyError(LibCryptoError):
+ pass
+
+class PKey:
+ def __init__(self,ptr,cansign)
+ self.key=ptr:
+ self.cansign=cansign
+ def __del__(self):
+ libcrypto.EVP_PKEY_free(self.key)
+ def __eq__(self,other):
+ """ Compares two public keys. If one has private key and other
+ doesn't it doesn't affect result of comparation
+ """
+ return libcrypto.EVP_PKEY_cmp(self.key,other.key)==1
+ def __ne__(self,other):
+ return not self.__eq__(other)
+ def __str__(self):
+ """ printable representation of public key """
+ b=Membio()
+ libcrypto.EVP_PKEY_print_public(b.bio,self.key,0,NULL)
+ return str(b)
+ def privpem(s,password=None):
+ """ Class method for load from the pem string of private key """
+ b=Membio(s)
+ return PKey(libcrypto.PEM_read_bio_PrivateKey(b.bio,NULL,cb,c_char_p(password))
+
+ def privder(s):
+ """ Class method for load from the binary ASN1 structure of private key """
+ b=Membio(s)
+ return PKey(libcrypto.d2i_PrivateKey_bio(b.bio,NULL),True)
+ def pubpem(s):
+ """ Class method for load from public key pem string"""
+ b=Membio(s)
+ return PKey(libcrypto.PEM_read_bio_PUBKEY(b.bio,NULL,cb,c_char_p(password)),False)
+ def pubder(s):
+ """ Class method for load from the binary ASN1 structure """
+ b=Membio(s)
+ return PKey(libcrypto.d2i_PUBKEY_bio(b.bio,NULL),False)
+ def sign(self,digest,**kwargs):
+ """
+ Signs given digest and retirns signature
+ Keyword arguments allows to set various algorithm-specific
+ parameters. See pkeyutl(1) manual.
+ """
+ ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None)
+ if ctx is None:
+ raise PkeyError("Initailizing sign context")
+ if libcrypto.EVP_PKEY_sign_init(ctx)<1:
+ raise PkeyError("sign_init")
+ for oper in kwargs:
+ rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper])
+ if rw=-2:
+ raise PKeyError("Parameter %s is not supported by key"%(oper))
+ if rv<1:
+ raise PKeyError("Error setting parameter %s"(oper))
+ # Find out signature size
+ siglen=c_long(0)
+ if libcrypto.EVP_PKEY_sign(ctx,None,byref(siglen),digest,len(digest))<1:
+ raise PkeyError("signing")
+ sig=create_string_buffer(siglen.value)
+ libcrypto.EVP_PKEY_sign(ctx,sig,byref(signlen),digest,len(digest)
+ libcrypto.EVP_PKEY_CTX_free(ctx)
+ return sig.value[:siglen.value]
+
+ def verify(self,digest,signature,**kwargs):
+ """
+ Verifies given signature on given digest
+ Returns True if Ok, False if don't match
+ """
+ ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None)
+ if ctx is None:
+ raise PkeyError("Initailizing verify context")
+ if libcrypto.EVP_PKEY_verify_init(ctx)<1:
+ raise PkeyError("verify_init")
+ for oper in kwargs:
+ rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper])
+ if rw=-2:
+ raise PKeyError("Parameter %s is not supported by key"%(oper))
+ if rv<1:
+ raise PKeyError("Error setting parameter %s"(oper))
+ rv=libcrypto.EVP_PKEY_verify(ctx,signature,len(signature),digest,len(digest))
+ if rv<0:
+ raise PKeyError("Signature verification")
+ libcrypto=EVP_PKEY_CTX_free(ctx)
+ return rv>0
+ def generate(algorithm,**kwargs):
+ """
+ Generates new private-public key pair for given algorithm
+ (string like 'rsa','ec','gost2001') and algorithm-specific
+ parameters
+ """
+ tmpeng=c_void_p(None)
+ ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1)
+ if ameth is None:
+ raise PKeyError("Algorithm %s not foind\n"%(algname))
+ clear_err_stack()
+ pkey_id=c_int(0)
+ libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth)
+ libcrypto.ENGINE_finish(tmpeng)
+ ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id)
+ if ctx is None:
+ raise PKeyError("Creating context for key type %d"%(pkey_id.value))
+ if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 :
+ raise PKeyError("keygen_init")
+ for oper in kwargs:
+ rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper])
+ if rw=-2:
+ raise PKeyError("Parameter %s is not supported by key"%(oper))
+ if rv<1:
+ raise PKeyError("Error setting parameter %s"(oper))
+ key=c_void_p(None)
+ if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0:
+ raise PKeyError("Error generating key")
+ libcrypto.EVP_PKEY_CTX_free(ctx)
+ return PKey(key,True)
+
+class X509:
+ def __init__(self,ptr):
+ self.cert = ptr
+ def __del__(self):
+ libcrypto.X509_free(self.cert)
+ def __str__(self):
+ """ Returns der string of the certificate """
+ def pubkey(self):
+ """ Returns EVP PKEy object of certificate public key"""
+ return PKey(libcrypto.X509_get_pubkey(self.cert,False)
+ def verify(self,key):
+ """ Verify self on given issuer key """
+ def frompem(s):
+ """ Create X509 object from pem string """
+ def fromder(s):
+ """ Create X509 object from der string """
+
+class Verifier:
+ def __init__(self,filename):
+
+ def verify_cert(self,cert):
+
+class Signer:
+ def __init__(self,key):
+ self.key = key
+ def sign(self,digest):
+ if not self.key.cansign:
+ raise ValueError("Current PKey doesn't contain private part")
+ def verify(self,signature,digest):
+
--- /dev/null
+"""
+ Interface to the OpenSSL pseudo-random generator
+"""
+
+from ctypes import create_string_buffer, c_char_p, c_int, c_double
+from ctypescrypto import libcrypto
+from ctypescrypto.exception import LibCryptoError
+
+class RandError(LibCryptoError):
+ pass
+
+def bytes( num, check_result=False):
+ """
+ Returns num bytes of cryptographically strong pseudo-random
+ bytes. If checkc_result is True, raises error if PRNG is not
+ seeded enough
+ """
+
+ if num <= 0 :
+ raise ValueError, "'num' should be > 0"
+ buffer = create_string_buffer(num)
+ result = libcrypto.RAND_bytes(buffer, num)
+ if check_result and result == 0:
+ raise RandError, "Random Number Generator not seeded sufficiently"
+ return buffer.raw[:num]
+
+def pseudo_bytes(num):
+ """
+ Returns num bytes of pseudo random data. Pseudo- random byte
+ sequences generated by pseudo_bytes() will be unique if
+ they are of sufficient length, but are not necessarily
+ unpredictable. They can be used for non-cryptographic purposes
+ and for certain purposes in cryptographic protocols, but usually
+ not for key generation etc.
+ """
+ if num <= 0 :
+ raise ValueError, "'num' should be > 0"
+ buffer = create_string_buffer(num)
+ libcrypto.RAND_pseudo_bytes(buffer, num)
+ return buffer.raw[:num]
+
+def seed(data, entropy=None):
+ """
+ Seeds random generator with data.
+ If entropy is not None, it should be floating point(double)
+ value estimating amount of entropy in the data (in bytes).
+ """
+ if type(data) != type(""):
+ raise TypeError, "A string is expected"
+ ptr = c_char_p(data)
+ size = len(data)
+ if entropy is None:
+ libcrypto.RAND_seed(ptr, size)
+ else :
+ libcrypto.RAND_add(ptr, size, entropy)
+
+def status():
+ """
+ Returns 1 if random generator is sufficiently seeded and 0
+ otherwise
+ """
+
+ return libcrypto.RAND_status()
+
+libcrypto.RAND_add.argtypes=(c_char_p,c_int,c_double)
+libcrypto.RAND_seed.argtypes=(c_char_p,c_int)
+libcrypto.RAND_pseudo_bytes.argtypes=(c_char_p,c_int)
+libcrypto.RAND_bytes.argtypes=(c_char_p,c_int)