From 39458a66c05b88ba2c08164d238691a59e2708c8 Mon Sep 17 00:00:00 2001 From: Victor Wagner Date: Fri, 24 Oct 2014 17:50:59 +0400 Subject: [PATCH] Partially implemented X509 object. Added unicode support to BIO --- ctypescrypto/bio.py | 32 +++++- ctypescrypto/x509.py | 245 ++++++++++++++++++++++++++++++++++++++----- tests/testbio.py | 20 +++- 3 files changed, 267 insertions(+), 30 deletions(-) diff --git a/ctypescrypto/bio.py b/ctypescrypto/bio.py index 642b96b..f8150f7 100644 --- a/ctypescrypto/bio.py +++ b/ctypescrypto/bio.py @@ -3,12 +3,12 @@ from ctypes import c_char_p, c_void_p, c_int, string_at, c_long,POINTER,byref, c class Membio: """ Provides interface to OpenSSL memory bios - use str() to get contents of writable bio + use str() or unicode() to get contents of writable bio use bio member to pass to libcrypto function """ def __init__(self,data=None): """ If data is specified, creates read-only BIO. If data is - None, creates writable BIO + None, creates writable BIO, contents of which can be retrieved by str() or unicode() """ if data is None: method=libcrypto.BIO_s_mem() @@ -16,13 +16,28 @@ class Membio: else: self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data)) def __del__(self): + """ + Cleans up memory used by bio + """ libcrypto.BIO_free(self.bio) del(self.bio) def __str__(self): + """ + Returns current contents of buffer as byte string + """ p=c_char_p(None) l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p)) return string_at(p,l) + def __unicode__(self): + """ + Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode + """ + return str(self).decode("utf-8") def read(self,length=None): + """ + Reads data from readble BIO. For test purposes. + @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer + """ if not length is None: if type(length)!=type(0): raise TypeError("length to read should be number") @@ -50,13 +65,21 @@ class Membio: return out def write(self,data): + """ + Writes data to writable bio. For test purposes + """ + if isinstance(data,unicode): + data=data.encode("utf-8") r=libcrypto.BIO_write(self.bio,data,len(data)) if r==-2: raise NotImplementedError("Function not supported by this BIO") if r 0x80 + Probably it is not what you want, unless your native language is English + """ b=Membio() - libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,PRING_FLAG) - return str(b).decode("utf-8") - + libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB) + return str(b) + def __unicode__(self): + """ + Produces unicode representation of the name. + """ + b=Membio() + libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG) + return unicode(b) def __len__(self): + """ + return number of components in the name + """ return libcrypto.X509_NAME_entry_count(self.ptr) + def __cmp__(self,other): + """ + Compares X509 names + """ + return libcrypto.X509_NAME_cmp(self.ptr,other.ptr) + def __eq__(self,other): + return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0 - def __getattr__(self,key): + def __getitem__(self,key): if isinstance(key,Oid): - # Return list of strings - raise NotImpemented + # Return first matching field + idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1) + if idx<0: + raise KeyError("Key not found "+repr(Oid)) + entry=libcrypto.X509_NAME_get_entry(self.ptr,idx) + s=libcrypto.X509_NAME_ENTRY_get_data(entry) + b=Membio() + libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) + return unicode(b) elif isinstance(key,int): - # Return OID, sting tuple - raise NotImplemented - else: - raise TypeError("X509 name can be indexed with oids and numbers only") + # Return OID, string tuple + entry=libcrypto.X509_NAME_get_entry(self.ptr,key) + if entry is None: + raise IndexError("name entry index out of range") + obj=libcrypto.X509_NAME_ENTRY_get_object(entry) + nid=libcrypto.OBJ_obj2nid(obj) + if nid==0: + buf=create_string_buffer(80) + len=libcrypto.OBJ_obj2txt(buf,80,obj,1) + oid=Oid(buf[0:len]) + else: + oid=Oid(nid) + s=libcrypto.X509_NAME_ENTRY_get_data(entry) + b=Membio() + libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) + return (oid,unicode(b)) - def __setattr__(self,key,val): - pass + def __setitem__(self,key,val): + if not self.writable: + raise ValueError("Attempt to modify constant X509 object") class X509_extlist: def __init__(self,ptr): self.ptr=ptr def __del__(self): libcrypto.X509_NAME_free(self.ptr) def __str__(self): - raise NotImplemented + raise NotImplementedError def __len__(self): return libcrypto.X509_NAME_entry_count(self.ptr) def __getattr__(self,key): - raise NotImplemented + raise NotImplementedError def __setattr__(self,key,val): - raise NotImplemented + raise NotImplementedError class X509: + """ + Represents X.509 certificate. + """ def __init__(self,data=None,ptr=None,format="PEM"): + """ + Initializes certificate + @param data - serialized certificate in PEM or DER format. + @param ptr - pointer to X509, returned by some openssl function. + mutually exclusive with data + @param format - specifies data format. "PEM" or "DER", default PEM + """ if ptr is not None: if data is not None: raise TypeError("Cannot use data and ptr simultaneously") @@ -69,18 +146,54 @@ class X509: if self.cert is None: raise X509Error("error reading certificate") def __del__(self): + """ + Frees certificate object + """ libcrypto.X509_free(self.cert) def __str__(self): """ Returns der string of the certificate """ b=Membio() if libcrypto.i2d_X509_bio(b.bio,self.cert)==0: raise X509Error("error serializing certificate") + return str(b) + def __repr__(self): + """ Returns valid call to the constructor """ + return "X509(data="+repr(str(self))+",format='DER')" @property def pubkey(self): """EVP PKEy object of certificate public key""" return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False)) - def verify(self,key): - """ Verify self on given issuer key """ + def verify(self,store=None,key=None): + """ + Verify self. Supports verification on both X509 store object + or just public issuer key + @param store X509Store object. + @param key - PKey object + parameters are mutually exclusive. If neither is specified, attempts to verify + itself as self-signed certificate + """ + if store is not None and key is not None: + raise X509Error("key and store cannot be specified simultaneously") + if store is not None: + ctx=libcrypto.X509_STORE_CTX_new() + if ctx is None: + raise X509Error("Error allocating X509_STORE_CTX") + if libcrypt.X509_STORE_CTX_init(ctx,store.ptr,self.cert,None) < 0: + raise X509Error("Error allocating X509_STORE_CTX") + res= libcrypto.X509_verify_cert(ctx)>0 + libcrypto.X509_STORE_CTX_free(ctx) + return res + else: + if key is None: + if self.issuer != self.subject: + # Not a self-signed certificate + return False + key = self.pubkey + res = libcrypto.X509_verify(self.cert,key.ptr) + if res < 0: + raise X509Error("X509_verify failed") + return res>0 + @property def subject(self): """ X509Name for certificate subject name """ @@ -92,14 +205,96 @@ class X509: @property def serial(self): """ Serial number of certificate as integer """ - return + asnint=libcrypto.X509_get_serialNumber(self.cert) + b=Membio() + libcrypto.i2a_ASN1_INTEGER(b.bio,asnint) + return int(str(b),16) @property def startDate(self): """ Certificate validity period start date """ - raise NotImplemented + # Need deep poke into certificate structure (x)->cert_info->validity->notBefore + raise NotImplementedError @property def endDate(self): """ Certificate validity period end date """ - raise NotImplemented + # Need deep poke into certificate structure (x)->cert_info->validity->notAfter + raise NotImplementedError def extensions(self): - raise NotImplemented + raise NotImplementedError +class X509Store: + """ + Represents trusted certificate store. Can be used to lookup CA certificates to verify + + @param file - file with several certificates and crls to load into store + @param dir - hashed directory with certificates and crls + @param default - if true, default verify location (directory) is installed + + """ + def __init__(self,file=None,dir=None,default=False): + """ + Creates X509 store and installs lookup method. Optionally initializes + by certificates from given file or directory. + """ + # + # Todo - set verification flags + # + self.store=libcrypto.X509_STORE_new() + lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file()) + if lookup is None: + raise X509Error("error installing file lookup method") + if (file is not None): + if not libcrypto.X509_LOOKUP_loadfile(lookup,file,1): + raise X509Error("error loading trusted certs from file "+file) + + lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir()) + if lookup is None: + raise X509Error("error installing hashed lookup method") + if dir is not None: + if not libcrypto.X509_LOOKUP_add_dir(lookup,dir,1): + raise X509Error("error adding hashed trusted certs dir "+dir) + if default: + if not libcrypto.X509_LOOKUP.add_dir(lookup,None,3): + raise X509Error("error adding default trusted certs dir ") + def add_cert(self,cert): + """ + Explicitely adds certificate to set of trusted in the store + @param cert - X509 object to add + """ + if not isinstance(cert,X509): + raise TypeError("cert should be X509") + libcrypto.X509_STORE_add_cert(self.store,cert.cert) + def add_callback(self,callback): + """ + Installs callbac function, which would receive detailed information + about verified ceritificates + """ + raise NotImplementedError + def setflags(self,flags): + """ + Set certificate verification flags. + @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants + """ + libcrypto.X509_STORE_set_flags(self.store,flags) + def setpurpose(self,purpose): + """ + Sets certificate purpose which verified certificate should match + @param purpose - number from 1 to 9 or standard strind defined in Openssl + possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper + """ + if isinstance(purpose,str): + purp_no=X509_PURPOSE_get_by_sname(purpose) + if purp_no <=0: + raise X509Error("Invalid certificate purpose '"+purpose+"'") + elif isinstance(purpose,int): + purp_no = purpose + if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: + raise X509Error("cannot set purpose") +libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p) +libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long) +libcrypto.X509_get_serialNumber.argtypes=(c_void_p,) +libcrypto.X509_get_serialNumber.restype=c_void_p +libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p +libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,) +libcrypto.OBJ_obj2nid.argtypes=(c_void_p,) +libcrypto.X509_NAME_get_entry.restype=c_void_p +libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int) diff --git a/tests/testbio.py b/tests/testbio.py index d174a83..1bd2e1f 100644 --- a/tests/testbio.py +++ b/tests/testbio.py @@ -8,6 +8,15 @@ class TestRead(unittest.TestCase): data=bio.read() del bio self.assertEqual(data,s) + def test_reset(self): + s="A quick brown fox jumps over a lazy dog" + bio=Membio(s) + data=bio.read() + bio.reset() + data2=bio.read() + del bio + self.assertEqual(data,data2) + self.assertEqual(data,s) def test_readlongstr(self): poem='''Eyes of grey--a sodden quay, Driving rain and falling tears, @@ -83,6 +92,15 @@ class TestWrite(unittest.TestCase): b.write("the lazy dog.") self.assertEqual(str(b),"A quick brown fox jumps over the lazy dog.") - + def test_unicode(self): + b=Membio() + s='\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8' + b.write(s) + self.assertEqual(unicode(b),u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438') + def test_unicode2(self): + b=Membio() + u=u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438' + b.write(u) + self.assertEqual(unicode(b),u) if __name__ == '__main__': unittest.main() -- 2.39.5