+ # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
+ global utc
+ asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
+ b=Membio()
+ libcrypto.ASN1_TIME_print(b.bio,asn1date)
+ return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
+ def check_ca(self):
+ """ Returns True if certificate is CA certificate """
+ return libcrypto.X509_check_ca(self.cert)>0
+class X509Store(object):
+ """
+ Represents trusted certificate store. Can be used to lookup CA
+ certificates to verify
+
+ @param file - file with several certificates and crls
+ to load into store
+ @param dir - hashed directory with certificates and crls
+ @param default - if true, default verify location (directory)
+ is installed
+
+ """
+ def __init__(self,file=None,dir=None,default=False):
+ """
+ Creates X509 store and installs lookup method. Optionally initializes
+ by certificates from given file or directory.
+ """
+ #
+ # Todo - set verification flags
+ #
+ self.store=libcrypto.X509_STORE_new()
+ if self.store is None:
+ raise X509Error("allocating store")
+ lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
+ if lookup is None:
+ raise X509Error("error installing file lookup method")
+ if (file is not None):
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
+ raise X509Error("error loading trusted certs from file "+file)
+ lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
+ if lookup is None:
+ raise X509Error("error installing hashed lookup method")
+ if dir is not None:
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
+ raise X509Error("error adding hashed trusted certs dir "+dir)
+ if default:
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
+ raise X509Error("error adding default trusted certs dir ")
+ def add_cert(self,cert):
+ """
+ Explicitely adds certificate to set of trusted in the store
+ @param cert - X509 object to add
+ """
+ if not isinstance(cert,X509):
+ raise TypeError("cert should be X509")
+ libcrypto.X509_STORE_add_cert(self.store,cert.cert)
+ def add_callback(self,callback):
+ """
+ Installs callback function, which would receive detailed information
+ about verified ceritificates
+ """
+ raise NotImplementedError
+ def setflags(self,flags):
+ """
+ Set certificate verification flags.
+ @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
+ """
+ libcrypto.X509_STORE_set_flags(self.store,flags)
+ def setpurpose(self,purpose):
+ """
+ Sets certificate purpose which verified certificate should match
+ @param purpose - number from 1 to 9 or standard strind defined in Openssl
+ possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
+ """
+ if isinstance(purpose,str):
+ purp_no=X509_PURPOSE_get_by_sname(purpose)
+ if purp_no <=0:
+ raise X509Error("Invalid certificate purpose '"+purpose+"'")
+ elif isinstance(purpose,int):
+ purp_no = purpose
+ if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
+ raise X509Error("cannot set purpose")
+ def setdepth(self,depth):
+ """
+ Sets the verification depth i.e. max length of certificate chain
+ which is acceptable
+ """
+ libcrypto.X509_STORE_set_depth(self.store,depth)
+ def settime(self, time):
+ """
+ Set point in time used to check validity of certificates for
+ Time can be either python datetime object or number of seconds
+ sinse epoch
+ """
+ if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
+ d=int(time.strftime("%s"))
+ elif isinstance(time,int):
+ pass
+ else:
+ raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
+ raise NotImplementedError
+class StackOfX509(object):
+ """
+ Implements OpenSSL STACK_OF(X509) object.
+ It looks much like python container types
+ """
+ def __init__(self,certs=None,ptr=None,disposable=True):
+ """
+ Create stack
+ @param certs - list of X509 objects. If specified, read-write
+ stack is created and populated by these certificates
+ @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
+ some functions
+ @param disposable - if True, stack created from object, returned
+ by function is copy, and can be modified and need to be
+ freeid. If false, it is just pointer into another
+ structure i.e. CMS_ContentInfo
+ """
+ if ptr is None:
+ self.need_free = True
+ self.ptr=libcrypto.sk_new_null()
+ if certs is not None:
+ for crt in certs:
+ self.append(crt)
+ elif certs is not None:
+ raise ValueError("cannot handle certs an ptr simultaneously")
+ else:
+ self.need_free = disposable
+ self.ptr=ptr
+ def __len__(self):
+ return libcrypto.sk_num(self.ptr)
+ def __getitem__(self,index):
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_value(self.ptr,index)
+ return X509(ptr=libcrypto.X509_dup(p))
+ def __setitem__(self,index,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ if not isinstance(value,X509):
+ raise TypeError('StackOfX508 can contain only X509 objects')
+ p=libcrypto.sk_value(self.ptr,index)
+ libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+ libcrypto.X509_free(p)
+ def __delitem__(self,index):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_delete(self.ptr,index)
+ libcrypto.X509_free(p)
+ def __del__(self):
+ if self.need_free:
+ libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
+ def append(self,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if not isinstance(value,X509):
+ raise TypeError('StackOfX508 can contain only X509 objects')
+ libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
+libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
+libcrypto.ASN1_INTEGER_get.restype=c_long
+libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
+libcrypto.X509_get_serialNumber.restype=c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
+libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
+libcrypto.X509_NAME_get_entry.restype=c_void_p
+libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
+libcrypto.X509_STORE_new.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
+libcrypto.X509_LOOKUP_file.restype=c_void_p
+libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
+libcrypto.X509_LOOKUP_ctrl.restype=c_int
+libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
+libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
+libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.X509_get_ext.restype=c_void_p
+libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
+libcrypto.sk_set.restype=c_void_p
+libcrypto.sk_value.argtypes=(c_void_p,c_int)
+libcrypto.sk_value.restype=c_void_p
+libcrypto.X509_dup.restype=c_void_p
+libcrypto.sk_new_null.restype=c_void_p
+libcrypto.X509_dup.argtypes=(c_void_p,)