X-Git-Url: http://wagner.pp.ru/gitweb/?a=blobdiff_plain;f=ctypescrypto%2Fx509.py;h=dd4cba2b1c13b403359e98c4e1ca02b36158e7f0;hb=92df3e73921ba7b8756bfab1af4189dab7cc610e;hp=bd056d32f6227bcba4593e26fd79d06504ee5097;hpb=6ecf699650b17d6c8d8a8670d51b33631824fe46;p=oss%2Fctypescrypto.git diff --git a/ctypescrypto/x509.py b/ctypescrypto/x509.py index bd056d3..dd4cba2 100644 --- a/ctypescrypto/x509.py +++ b/ctypescrypto/x509.py @@ -18,9 +18,9 @@ from datetime import datetime try: from pytz import utc except ImportError: - from datetime import timedelta + from datetime import timedelta,tzinfo ZERO=timedelta(0) - class UTC(datetime.tzinfo): + class UTC(tzinfo): """tzinfo object for UTC. If no pytz is available, we would use it. """ @@ -57,6 +57,9 @@ class _cinf(Structure): ('validity',POINTER(_validity)), ('subject',c_void_p), ('pubkey',c_void_p), + ('issuerUID',c_void_p), + ('subjectUID',c_void_p), + ('extensions',c_void_p), ] class _x509(Structure): @@ -72,7 +75,6 @@ class _x509(Structure): ] _px509 = POINTER(_x509) -# X509_extlist is not exported yet, because is not implemented class X509Error(LibCryptoError): """ Exception, generated when some openssl function fail @@ -81,7 +83,7 @@ class X509Error(LibCryptoError): pass -class X509Name: +class X509Name(object): """ Class which represents X.509 distinguished name - typically a certificate subject name or an issuer name. @@ -146,52 +148,117 @@ class X509Name: # Return first matching field idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1) if idx<0: - raise KeyError("Key not found "+repr(Oid)) + raise KeyError("Key not found "+str(Oid)) entry=libcrypto.X509_NAME_get_entry(self.ptr,idx) s=libcrypto.X509_NAME_ENTRY_get_data(entry) b=Membio() libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) return unicode(b) - elif isinstance(key,int): + elif isinstance(key,(int,long)): # Return OID, string tuple entry=libcrypto.X509_NAME_get_entry(self.ptr,key) if entry is None: raise IndexError("name entry index out of range") - obj=libcrypto.X509_NAME_ENTRY_get_object(entry) - nid=libcrypto.OBJ_obj2nid(obj) - if nid==0: - buf=create_string_buffer(80) - len=libcrypto.OBJ_obj2txt(buf,80,obj,1) - oid=Oid(buf[0:len]) - else: - oid=Oid(nid) + oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) s=libcrypto.X509_NAME_ENTRY_get_data(entry) b=Membio() libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) return (oid,unicode(b)) + else: + raise TypeError("X509 NAME can be indexed by Oids or integers only") def __setitem__(self,key,val): if not self.writable: raise ValueError("Attempt to modify constant X509 object") -class X509_extlist: - def __init__(self,ptr): - self.ptr=ptr + else: + raise NotImplementedError + def __delitem__(self,key): + if not self.writable: + raise ValueError("Attempt to modify constant X509 object") + else: + raise NotImplementedError + +class _x509_ext(Structure): + """ Represens C structure X509_EXTENSION """ + _fields_=[("object",c_void_p), + ("critical",c_int), + ("value",c_void_p)] + +class X509_EXT(object): + """ Python object which represents a certificate extension """ + def __init__(self,ptr,copy=False): + """ Initializes from the pointer to X509_EXTENSION. + If copy is True, creates a copy, otherwise just + stores pointer. + """ + if copy: + self.ptr=libcrypto.X509_EXTENSION_dup(ptr) + else: + self.ptr=cast(ptr,POINTER(_x509_ext)) def __del__(self): - libcrypto.X509_NAME_free(self.ptr) + libcrypto.X509_EXTENSION_free(self.ptr) def __str__(self): - raise NotImplementedError + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + return str(b) + def __unicode__(self): + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + return unicode(b) + @property + def oid(self): + return Oid.fromobj(self.ptr[0].object) + @property + def critical(self): + return self.ptr[0].critical >0 +class _X509extlist(object): + """ + Represents list of certificate extensions + """ + def __init__(self,cert): + self.cert=cert def __len__(self): - return libcrypto.X509_NAME_entry_count(self.ptr) - - def __getattr__(self,key): - raise NotImplementedError - def __setattr__(self,key,val): - raise NotImplementedError - - - + return libcrypto.X509_get_ext_count(self.cert.cert) + def __getitem__(self,item): + p=libcrypto.X509_get_ext(self.cert.cert,item) + if p is None: + raise IndexError + return X509_EXT(p,True) + def find(self,oid): + """ + Return list of extensions with given Oid + """ + if not isinstance(oid,Oid): + raise TypeError("Need crytypescrypto.oid.Oid as argument") + found=[] + l=-1 + end=len(self) + while True: + l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l) + if l>=end or l<0: + break + found.append(self[l]) + return found + def find_critical(self,crit=True): + """ + Return list of critical extensions (or list of non-cricital, if + optional second argument is False + """ + if crit: + flag=1 + else: + flag=0 + found=[] + end=len(self) + l=-1 + while True: + l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l) + if l>=end or l<0: + break + found.append(self[l]) + return found -class X509: +class X509(object): """ Represents X.509 certificate. """ @@ -217,6 +284,7 @@ class X509: self.cert=libcrypto.d2i_X509_bio(b.bio,None) if self.cert is None: raise X509Error("error reading certificate") + self.extensions=_X509extlist(self) def __del__(self): """ Frees certificate object @@ -313,13 +381,10 @@ class X509: b=Membio() libcrypto.ASN1_TIME_print(b.bio,asn1date) return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) - def extensions(self): - """ Returns list of extensions """ - raise NotImplementedError def check_ca(self): """ Returns True if certificate is CA certificate """ return libcrypto.X509_check_ca(self.cert)>0 -class X509Store: +class X509Store(object): """ Represents trusted certificate store. Can be used to lookup CA certificates to verify @@ -367,7 +432,7 @@ class X509Store: libcrypto.X509_STORE_add_cert(self.store,cert.cert) def add_callback(self,callback): """ - Installs callbac function, which would receive detailed information + Installs callback function, which would receive detailed information about verified ceritificates """ raise NotImplementedError @@ -392,10 +457,16 @@ class X509Store: if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: raise X509Error("cannot set purpose") def setdepth(self,depth): + """ + Sets the verification depth i.e. max length of certificate chain + which is acceptable + """ libcrypto.X509_STORE_set_depth(self.store,depth) def settime(self, time): """ Set point in time used to check validity of certificates for + Time can be either python datetime object or number of seconds + sinse epoch """ if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): d=int(time.strftime("%s")) @@ -404,7 +475,7 @@ class X509Store: else: raise TypeError("datetime.date, datetime.datetime or integer is required as time argument") raise NotImplementedError -class StackOfX509: +class StackOfX509(object): """ Implements OpenSSL STACK_OF(X509) object. It looks much like python container types @@ -423,11 +494,11 @@ class StackOfX509: """ if ptr is None: self.need_free = True - self.ptr=libcrypt.sk_new_null() + self.ptr=libcrypto.sk_new_null() if certs is not None: for crt in certs: self.append(crt) - elif not certs is None: + elif certs is not None: raise ValueError("cannot handle certs an ptr simultaneously") else: self.need_free = disposable @@ -439,12 +510,15 @@ class StackOfX509: raise IndexError p=libcrypto.sk_value(self.ptr,index) return X509(ptr=libcrypto.X509_dup(p)) - def __putitem__(self,index,value): + def __setitem__(self,index,value): if not self.need_free: raise ValueError("Stack is read-only") if index <0 or index>=len(self): raise IndexError - p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) + if not isinstance(value,X509): + raise TypeError('StackOfX508 can contain only X509 objects') + p=libcrypto.sk_value(self.ptr,index) + libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) libcrypto.X509_free(p) def __delitem__(self,index): if not self.need_free: @@ -459,6 +533,8 @@ class StackOfX509: def append(self,value): if not self.need_free: raise ValueError("Stack is read-only") + if not isinstance(value,X509): + raise TypeError('StackOfX508 can contain only X509 objects') libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert)) libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p) libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long) @@ -479,4 +555,16 @@ libcrypto.X509_LOOKUP_file.restype=c_void_p libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p libcrypto.X509_LOOKUP_ctrl.restype=c_int libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p)) - +libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,) +libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext) +libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) +libcrypto.X509_get_ext.restype=c_void_p +libcrypto.X509_get_ext.argtypes=(c_void_p,c_int) +libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) +libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p) +libcrypto.sk_set.restype=c_void_p +libcrypto.sk_value.argtypes=(c_void_p,c_int) +libcrypto.sk_value.restype=c_void_p +libcrypto.X509_dup.restype=c_void_p +libcrypto.sk_new_null.restype=c_void_p +libcrypto.X509_dup.argtypes=(c_void_p,)