From: Victor Wagner Date: Wed, 4 Jun 2014 20:14:26 +0000 (+0400) Subject: Initial commit of modules X-Git-Url: http://wagner.pp.ru/gitweb/?a=commitdiff_plain;h=1ff9b1899959673512927b6afa317855908b7073;p=oss%2Fctypescrypto.git Initial commit of modules --- diff --git a/README.md b/README.md index b3f6602..91336ba 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,20 @@ ctypescrypto ============ Python interface to some openssl function based on ctypes module + +This module is based on works from + +http://code.google.com/p/ctypescrypto/ + +It is aimed to provide Python interface to OpenSSL libcrypto function + +Now supported: + +Digests +Ciphers +Low-level private key operations (like pkey and pkeyutl command line ops) +(all via algorithm-agnostic EVP interface). +Engine loading +OpenSSL error stack to python exception conversion +X509 certificates partially + diff --git a/ctypescrypto/__init__.py b/ctypescrypto/__init__.py new file mode 100644 index 0000000..b6d00ca --- /dev/null +++ b/ctypescrypto/__init__.py @@ -0,0 +1,9 @@ +""" + Interface to some libcrypto functions + +""" + +from ctypes import CDLL + +libcrypto = CDLL("libcrypto.so.1.0.0") +libcrypto.OPENSSL_config(None) diff --git a/ctypescrypto/bio.py b/ctypescrypto/bio.py new file mode 100644 index 0000000..1263ce6 --- /dev/null +++ b/ctypescrypto/bio.py @@ -0,0 +1,30 @@ +from ctypescrypto import libcrypto +from ctypes import c_char_p, c_int, string_at +class Membio: + """ + Provides interface to OpenSSL memory bios + use str() to get contents of writable bio + use bio member to pass to libcrypto function + """ + def __init__(self,data=None): + """ If data is specified, creates read-only BIO. If data is + None, creates writable BIO + """ + if data is None: + method=libcrypto.BIO_s_mem() + self.bio=libcrypto.BIO_new(method) + else: + self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data))q + def __del__(self): + libcrypto.BIO_free(self.bio) + del(self.bio) + def __str__(self): + p=c_char_p(None) + l=BIO_get_mem_data(self.bio,byref(p)) + return string_at(p,l) +#FIXME TODO - BIO should have stream-like interface +libcrypto.BIO_s_mem.restype=c_void_p +libcrypto.BIO_new.restype=c_void_p +libcrypto.BIO_new.argtypes=(c_void_p,) +libcrypto.BIO_get_mem_data.restype=c_long +libcrypto.BIO_get_mem_data.argtypes=(c_void_p,POINTER(c_char_p)) diff --git a/ctypescrypto/cipher.py b/ctypescrypto/cipher.py new file mode 100644 index 0000000..73a14ce --- /dev/null +++ b/ctypescrypto/cipher.py @@ -0,0 +1,102 @@ +from ctypes import * + +CIPHER_ALGORITHMS = ("DES", "DES-EDE3", "BF", "AES-128", "AES-192", "AES-256") +CIPHER_MODES = ("CBC", "CFB", "OFB", "ECB") + +class CipherError(Exception): + pass + +class CipherType: + + def __init__(self, libcrypto, cipher_algo, cipher_mode): + self.libcrypto = libcrypto + self.cipher_algo = cipher_algo + self.cipher_mode = cipher_mode + cipher_name = "-".join([self.cipher_algo, self.cipher_mode]) + self.cipher = self.libcrypto.EVP_get_cipherbyname(cipher_name) + if self.cipher == 0: + raise CipherError, "Unknown cipher: %s" % cipher_name + + def __del__(self): + pass + + def algo(self): + return self.cipher_algo + + def mode(self): + return self.cipher_mode + +class Cipher: + + def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True): + self.libcrypto = libcrypto + self._clean_ctx() + key_ptr = c_char_p(key) + iv_ptr = c_char_p(iv) + self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr) + if self.ctx == 0: + raise CipherError, "Unable to create cipher context" + self.encrypt = encrypt + if encrypt: + enc = 1 + else: + enc = 0 + result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) + self.cipher_type = cipher_type + if result == 0: + self._clean_ctx() + raise CipherError, "Unable to initialize cipher" + + def __del__(self): + self._clean_ctx() + + def enable_padding(self, padding=True): + if padding: + padding_flag = 1 + else: + padding_flag = 0 + self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) + + def update(self, data): + if self.cipher_finalized : + raise CipherError, "No updates allowed" + if type(data) != type(""): + raise TypeError, "A string is expected" + if len(data) <= 0: + return "" + self.data = self.data + data + + def finish(self, data=None): + if data is not None: + self.update(data) + return self._finish() + + def _finish(self): + if self.cipher_finalized : + raise CipherError, "Cipher operation is already completed" + self.cipher_out = create_string_buffer(len(self.data) + 32) + result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data)) + if result == 0: + self._clean_ctx() + raise CipherError, "Unable to update cipher" + self.cipher_finalized = True + update_data = self.cipher_out.raw[:self.cipher_out_len.value] + result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len)) + if result == 0: + self._clean_ctx() + raise CipherError, "Unable to finalize cipher" + final_data = self.cipher_out.raw[:self.cipher_out_len.value] + return update_data + final_data + + def _clean_ctx(self): + try: + if self.ctx is not None: + self.libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx) + self.libcrypto.EVP_CIPHER_CTX_free(self.ctx) + del(self.ctx) + except AttributeError: + pass + self.cipher_out = None + self.cipher_out_len = c_long(0) + self.data = "" + self.cipher_finalized = False \ No newline at end of file diff --git a/ctypescrypto/digest.py b/ctypescrypto/digest.py new file mode 100644 index 0000000..4b0eeb5 --- /dev/null +++ b/ctypescrypto/digest.py @@ -0,0 +1,135 @@ +""" + Implmenets interface to OpenSSL EVP_Digest* functions. + Interface made as close to hashlib as possible +""" +from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long_long +from ctypescrypto import libcrypto +from ctypescrypto.exception import LibCryptoError +DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512") + + +class DigestError(LibCryptoError): + pass + +def new(algname): + md=DigestType(algname) + return Digest(md) + +class DigestType: + """ + + Represents EVP_MD object - constant structure which describes + digest algorithm + + """ + def __init__(self, digest_name): + """ + Finds digest by its name + """ + self.digest_name = digest_name + self.digest = libcrypto.EVP_get_digestbyname(self.digest_name) + if self.digest == 0: + raise DigestError, "Unknown digest: %s" % self.digest_name + + def __del__(self): + pass + def digest_size(self): + return libcrypto.EVP_MD_size(self.digest) + def block_size(self): + return libcrypto.EVP_MD_block_size(self.digest) + def oid(self): + pass + #FIXME TBD + # return Oid(nid=libcrypto.EVP_MD_type(self.digest) +class Digest: + """ + Represents EVP_MD_CTX object which actually used to calculate + digests. + + """ + def __init__(self, digest_type): + """ + Initializes digest using given type. + """ + self._clean_ctx() + self.ctx = libcrypto.EVP_MD_CTX_create() + if self.ctx == 0: + raise DigestError, "Unable to create digest context" + result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None) + if result == 0: + self._clean_ctx() + raise DigestError, "Unable to initialize digest" + self.digest_type = digest_type + self.digest_size = self.digest_type.digest_size() + self.block_size = self.digest_type.block_size() + + def __del__(self): + self._clean_ctx() + + def update(self, data): + """ + Hashes given byte string as data + """ + if self.digest_finalized: + raise DigestError, "No updates allowed" + if type(data) != type(""): + raise TypeError, "A string is expected" + result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), len(data)) + if result != 1: + raise DigestError, "Unable to update digest" + + def digest(self, data=None): + """ + Finalizes digest operation and return digest value + Optionally hashes data before finalizing + """ + if self.digest_finalized: + return self.digest_out.raw[:self.digest_size] + if data is not None: + self.update(data) + self.digest_out = create_string_buffer(256) + length = c_long(0) + result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length)) + if result != 1 : + raise DigestError, "Unable to finalize digest" + self.digest_finalized = True + return self.digest_out.raw[:self.digest_size] + def copy(self): + """ + Creates copy of the digest CTX to allow to compute digest + while being able to hash more data + """ + new_digest=Digest(self.digest_type) + libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx) + return new_digest + + def _clean_ctx(self): + try: + if self.ctx is not None: + libcrypto.EVP_MD_CTX_destroy(self.ctx) + del(self.ctx) + except AttributeError: + pass + self.digest_out = None + self.digest_finalized = False + + def hexdigest(self,data=None): + """ + Returns digest in the hexadecimal form. For compatibility + with hashlib + """ + from base64 import b16encode + return b16encode(self.digest(data) + + +# Declare function result and argument types +libcrypto.EVP_get_digestbyname.restype = c_void_p +libcrypto.EVP_MD_CTX_create.restype = c_void_p +libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p,c_void_p,c_void_p) +libcrypto.EVP_DigestUpdate.argtypes = (c_void_p,c_char_p,c_longlong) +libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p,c_char_p,POINTER(c_long)) +libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p,) +libcrypto.EVP_MD_CTX_copy.argtypes=(c_void_p, c_void_p) +libcrypto.EVP_MD_type.argtypes=(c_void_p,) +libcrypto.EVP_MD_size.argtypes=(c_void_p,) +libcrypto.EVP_MD_block_size.argtypes=(c_void_p,) diff --git a/ctypescrypto/engine.py b/ctypescrypto/engine.py new file mode 100644 index 0000000..47f4878 --- /dev/null +++ b/ctypescrypto/engine.py @@ -0,0 +1,27 @@ +from ctypes import * +from ctypescrypto import libcrypto +from ctypescrypto.exception import LibCryptoError +default=None + +def set_default(engine): + global default + e=libcrypto.ENGINE_by_id(engine) + if e is None: + # Try load engine + e = libcrypto.ENGINE_by_id("dynamic") + if e is None: + raise LibCryptoError("Cannot get 'dynamic' engine") + if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0): + raise LibCryptoError("Cannot execute ctrl cmd SO_PATH") + if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0): + raise LibCryptoError("Cannot execute ctrl cmd LOAD") + if e is None: + raise ValueError("Cannot find engine "+engine) + libcrypto.ENGINE_set_default(e,c_int(0xFFFF)) + default=e + +# Declare function result and arguments for used functions +libcrypto.ENGINE_by_id.restype=c_void_p +libcrypto.ENGINE_by_id.argtypes=(c_char_p,) +libcrypto.ENGINE_set_default.argtypes=(c_void_p,c_int) +libcrypto.ENGINE_ctrl_cmd_string.argtypes=(c_void_p,c_char_p,c_char_p,c_int) diff --git a/ctypescrypto/exception.py b/ctypescrypto/exception.py new file mode 100644 index 0000000..f0f3c53 --- /dev/null +++ b/ctypescrypto/exception.py @@ -0,0 +1,35 @@ +from ctypes import * +from ctypescrypto import libcrypto +strings_loaded=False +class LibCryptoError(Exception): + """ + Exception for libcrypto errors. Adds all the info, which can be + extracted from internal (per-thread) libcrypto error stack to the message, + passed to the constructor. + """ + def __init__(self,msg): + global strings_loaded + if not strings_loaded: + libcrypto.ERR_load_crypto_strings() + strings_loaded = True + e=libcrypto.ERR_get_error() + m = msg + while e != 0: + m+="\n\t"+libcrypto.ERR_lib_error_string(e)+":"+\ + libcrypto.ERR_func_error_string(e)+":"+\ + libcrypto.ERR_reason_error_string(e) + e=libcrypto.ERR_get_error() + self.args=(m,) + +def clear_err_stack(): + """ + Clears internal libcrypto err stack. Call it if you've checked + return code and processed exceptional situation, so subsequent + raising of the LibCryptoError wouldn't list already handled errors + """ + libcrypto.ERR_clear_error() + + +libcrypto.ERR_lib_error_string.restype=c_char_p +libcrypto.ERR_func_error_string.restype=c_char_p +libcrypto.ERR_reason_error_string.restype=c_char_p diff --git a/ctypescrypto/oid.py b/ctypescrypto/oid.py new file mode 100644 index 0000000..81bea92 --- /dev/null +++ b/ctypescrypto/oid.py @@ -0,0 +1,29 @@ +""" + Interface to OpenSSL object identifier database +""" +from ctypescrypto import libcrypto +class Oid: + def __init__(self,value): + if type(value) == type(""): + self.nid=libcrypto.OBJ_txt2nid(value) + if self.nid==0: + raise LibCryptoError("Cannot find object %s in the + database"%(value)) + elif type(value) == type(0): + self.nid=value + else: + raise TypeError("Cannot convert this type to object identifier") + def __cmp__(self,other): + return self.nid-other.nid + def __str__(self): + return self.dotted() + def shorttname(self): + return libcrypto.OBJ_nid2sn(self.nid) + def longname(self): + return libcrypto.OBJ_nid2ln(self.nid) + def dotted(self) + obj=libcrypto.OBJ_nid2obj(self.nid) + buf=create_string_buffer(256) + libcrypto.OBJ_obj2txt(buf,256,obj,1) + return buf.value + diff --git a/ctypescrypto/pkey.py b/ctypescrypto/pkey.py new file mode 100644 index 0000000..63bdfd8 --- /dev/null +++ b/ctypescrypto/pkey.py @@ -0,0 +1,151 @@ +from ctypes import byref,c_int,c_long, c_longlong, create_string_buffer +from ctypescrypto import libcrypto +from ctypescrypto.exception import LibCryptoErrors,clear_err_stack +from ctypescrypto.bio import Membio + +class PKeyError(LibCryptoError): + pass + +class PKey: + def __init__(self,ptr,cansign) + self.key=ptr: + self.cansign=cansign + def __del__(self): + libcrypto.EVP_PKEY_free(self.key) + def __eq__(self,other): + """ Compares two public keys. If one has private key and other + doesn't it doesn't affect result of comparation + """ + return libcrypto.EVP_PKEY_cmp(self.key,other.key)==1 + def __ne__(self,other): + return not self.__eq__(other) + def __str__(self): + """ printable representation of public key """ + b=Membio() + libcrypto.EVP_PKEY_print_public(b.bio,self.key,0,NULL) + return str(b) + def privpem(s,password=None): + """ Class method for load from the pem string of private key """ + b=Membio(s) + return PKey(libcrypto.PEM_read_bio_PrivateKey(b.bio,NULL,cb,c_char_p(password)) + + def privder(s): + """ Class method for load from the binary ASN1 structure of private key """ + b=Membio(s) + return PKey(libcrypto.d2i_PrivateKey_bio(b.bio,NULL),True) + def pubpem(s): + """ Class method for load from public key pem string""" + b=Membio(s) + return PKey(libcrypto.PEM_read_bio_PUBKEY(b.bio,NULL,cb,c_char_p(password)),False) + def pubder(s): + """ Class method for load from the binary ASN1 structure """ + b=Membio(s) + return PKey(libcrypto.d2i_PUBKEY_bio(b.bio,NULL),False) + def sign(self,digest,**kwargs): + """ + Signs given digest and retirns signature + Keyword arguments allows to set various algorithm-specific + parameters. See pkeyutl(1) manual. + """ + ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) + if ctx is None: + raise PkeyError("Initailizing sign context") + if libcrypto.EVP_PKEY_sign_init(ctx)<1: + raise PkeyError("sign_init") + for oper in kwargs: + rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper]) + if rw=-2: + raise PKeyError("Parameter %s is not supported by key"%(oper)) + if rv<1: + raise PKeyError("Error setting parameter %s"(oper)) + # Find out signature size + siglen=c_long(0) + if libcrypto.EVP_PKEY_sign(ctx,None,byref(siglen),digest,len(digest))<1: + raise PkeyError("signing") + sig=create_string_buffer(siglen.value) + libcrypto.EVP_PKEY_sign(ctx,sig,byref(signlen),digest,len(digest) + libcrypto.EVP_PKEY_CTX_free(ctx) + return sig.value[:siglen.value] + + def verify(self,digest,signature,**kwargs): + """ + Verifies given signature on given digest + Returns True if Ok, False if don't match + """ + ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) + if ctx is None: + raise PkeyError("Initailizing verify context") + if libcrypto.EVP_PKEY_verify_init(ctx)<1: + raise PkeyError("verify_init") + for oper in kwargs: + rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper]) + if rw=-2: + raise PKeyError("Parameter %s is not supported by key"%(oper)) + if rv<1: + raise PKeyError("Error setting parameter %s"(oper)) + rv=libcrypto.EVP_PKEY_verify(ctx,signature,len(signature),digest,len(digest)) + if rv<0: + raise PKeyError("Signature verification") + libcrypto=EVP_PKEY_CTX_free(ctx) + return rv>0 + def generate(algorithm,**kwargs): + """ + Generates new private-public key pair for given algorithm + (string like 'rsa','ec','gost2001') and algorithm-specific + parameters + """ + tmpeng=c_void_p(None) + ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1) + if ameth is None: + raise PKeyError("Algorithm %s not foind\n"%(algname)) + clear_err_stack() + pkey_id=c_int(0) + libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth) + libcrypto.ENGINE_finish(tmpeng) + ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id) + if ctx is None: + raise PKeyError("Creating context for key type %d"%(pkey_id.value)) + if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 : + raise PKeyError("keygen_init") + for oper in kwargs: + rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,kwargs[oper]) + if rw=-2: + raise PKeyError("Parameter %s is not supported by key"%(oper)) + if rv<1: + raise PKeyError("Error setting parameter %s"(oper)) + key=c_void_p(None) + if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0: + raise PKeyError("Error generating key") + libcrypto.EVP_PKEY_CTX_free(ctx) + return PKey(key,True) + +class X509: + def __init__(self,ptr): + self.cert = ptr + def __del__(self): + libcrypto.X509_free(self.cert) + def __str__(self): + """ Returns der string of the certificate """ + def pubkey(self): + """ Returns EVP PKEy object of certificate public key""" + return PKey(libcrypto.X509_get_pubkey(self.cert,False) + def verify(self,key): + """ Verify self on given issuer key """ + def frompem(s): + """ Create X509 object from pem string """ + def fromder(s): + """ Create X509 object from der string """ + +class Verifier: + def __init__(self,filename): + + def verify_cert(self,cert): + +class Signer: + def __init__(self,key): + self.key = key + def sign(self,digest): + if not self.key.cansign: + raise ValueError("Current PKey doesn't contain private part") + def verify(self,signature,digest): + diff --git a/ctypescrypto/rand.py b/ctypescrypto/rand.py new file mode 100644 index 0000000..5d51eed --- /dev/null +++ b/ctypescrypto/rand.py @@ -0,0 +1,68 @@ +""" + Interface to the OpenSSL pseudo-random generator +""" + +from ctypes import create_string_buffer, c_char_p, c_int, c_double +from ctypescrypto import libcrypto +from ctypescrypto.exception import LibCryptoError + +class RandError(LibCryptoError): + pass + +def bytes( num, check_result=False): + """ + Returns num bytes of cryptographically strong pseudo-random + bytes. If checkc_result is True, raises error if PRNG is not + seeded enough + """ + + if num <= 0 : + raise ValueError, "'num' should be > 0" + buffer = create_string_buffer(num) + result = libcrypto.RAND_bytes(buffer, num) + if check_result and result == 0: + raise RandError, "Random Number Generator not seeded sufficiently" + return buffer.raw[:num] + +def pseudo_bytes(num): + """ + Returns num bytes of pseudo random data. Pseudo- random byte + sequences generated by pseudo_bytes() will be unique if + they are of sufficient length, but are not necessarily + unpredictable. They can be used for non-cryptographic purposes + and for certain purposes in cryptographic protocols, but usually + not for key generation etc. + """ + if num <= 0 : + raise ValueError, "'num' should be > 0" + buffer = create_string_buffer(num) + libcrypto.RAND_pseudo_bytes(buffer, num) + return buffer.raw[:num] + +def seed(data, entropy=None): + """ + Seeds random generator with data. + If entropy is not None, it should be floating point(double) + value estimating amount of entropy in the data (in bytes). + """ + if type(data) != type(""): + raise TypeError, "A string is expected" + ptr = c_char_p(data) + size = len(data) + if entropy is None: + libcrypto.RAND_seed(ptr, size) + else : + libcrypto.RAND_add(ptr, size, entropy) + +def status(): + """ + Returns 1 if random generator is sufficiently seeded and 0 + otherwise + """ + + return libcrypto.RAND_status() + +libcrypto.RAND_add.argtypes=(c_char_p,c_int,c_double) +libcrypto.RAND_seed.argtypes=(c_char_p,c_int) +libcrypto.RAND_pseudo_bytes.argtypes=(c_char_p,c_int) +libcrypto.RAND_bytes.argtypes=(c_char_p,c_int)