2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta
23 class UTC(datetime.tzinfo):
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
62 class _x509(Structure):
64 ctypes represntation of X509 structure needed
65 to access certificate data which are accesable only via
68 _fields_ = [('cert_info',POINTER(_cinf)),
70 ('signature',c_void_p),
71 # There are a lot of parsed extension fields there
73 _px509 = POINTER(_x509)
75 # X509_extlist is not exported yet, because is not implemented
76 class X509Error(LibCryptoError):
78 Exception, generated when some openssl function fail
86 Class which represents X.509 distinguished name - typically
87 a certificate subject name or an issuer name.
89 Now used only to represent information, extracted from the
90 certificate. Potentially can be also used to build DN when creating
91 certificate signing request
93 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96 def __init__(self,ptr=None,copy=False):
98 Creates a X509Name object
99 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
100 @param copy - indicates that this structure have to be freed upon object destruction
107 self.ptr=libcrypto.X509_NAME_new()
115 libcrypto.X509_NAME_free(self.ptr)
118 Produces an ascii representation of the name, escaping all symbols > 0x80
119 Probably it is not what you want, unless your native language is English
122 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
124 def __unicode__(self):
126 Produces unicode representation of the name.
129 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
133 return number of components in the name
135 return libcrypto.X509_NAME_entry_count(self.ptr)
136 def __cmp__(self,other):
140 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
141 def __eq__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
144 def __getitem__(self,key):
145 if isinstance(key,Oid):
146 # Return first matching field
147 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
149 raise KeyError("Key not found "+repr(Oid))
150 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
151 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
153 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
155 elif isinstance(key,int):
156 # Return OID, string tuple
157 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
159 raise IndexError("name entry index out of range")
160 obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
161 nid=libcrypto.OBJ_obj2nid(obj)
163 buf=create_string_buffer(80)
164 len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
168 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
170 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
171 return (oid,unicode(b))
173 def __setitem__(self,key,val):
174 if not self.writable:
175 raise ValueError("Attempt to modify constant X509 object")
177 def __init__(self,ptr):
180 libcrypto.X509_NAME_free(self.ptr)
182 raise NotImplementedError
184 return libcrypto.X509_NAME_entry_count(self.ptr)
186 def __getattr__(self,key):
187 raise NotImplementedError
188 def __setattr__(self,key,val):
189 raise NotImplementedError
196 Represents X.509 certificate.
198 def __init__(self,data=None,ptr=None,format="PEM"):
200 Initializes certificate
201 @param data - serialized certificate in PEM or DER format.
202 @param ptr - pointer to X509, returned by some openssl function.
203 mutually exclusive with data
204 @param format - specifies data format. "PEM" or "DER", default PEM
208 raise TypeError("Cannot use data and ptr simultaneously")
211 raise TypeError("data argument is required")
215 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
217 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
218 if self.cert is None:
219 raise X509Error("error reading certificate")
222 Frees certificate object
224 libcrypto.X509_free(self.cert)
226 """ Returns der string of the certificate """
228 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
229 raise X509Error("error serializing certificate")
232 """ Returns valid call to the constructor """
233 return "X509(data="+repr(str(self))+",format='DER')"
236 """EVP PKEy object of certificate public key"""
237 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
238 def verify(self,store=None,chain=[],key=None):
240 Verify self. Supports verification on both X509 store object
241 or just public issuer key
242 @param store X509Store object.
243 @param chain - list of X509 objects to add into verification
244 context.These objects are untrusted, but can be used to
245 build certificate chain up to trusted object in the store
246 @param key - PKey object with open key to validate signature
248 parameters store and key are mutually exclusive. If neither
249 is specified, attempts to verify self as self-signed certificate
251 if store is not None and key is not None:
252 raise X509Error("key and store cannot be specified simultaneously")
253 if store is not None:
254 ctx=libcrypto.X509_STORE_CTX_new()
256 raise X509Error("Error allocating X509_STORE_CTX")
257 if chain is not None and len(chain)>0:
258 ch=StackOfX509(chain)
261 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
262 raise X509Error("Error allocating X509_STORE_CTX")
263 res= libcrypto.X509_verify_cert(ctx)
264 libcrypto.X509_STORE_CTX_free(ctx)
268 if self.issuer != self.subject:
269 # Not a self-signed certificate
272 res = libcrypto.X509_verify(self.cert,key.key)
274 raise X509Error("X509_verify failed")
279 """ X509Name for certificate subject name """
280 return X509Name(libcrypto.X509_get_subject_name(self.cert))
283 """ X509Name for certificate issuer name """
284 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
287 """ Serial number of certificate as integer """
288 asnint=libcrypto.X509_get_serialNumber(self.cert)
290 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
291 return int(str(b),16)
294 """ certificate version as integer. Really certificate stores 0 for
295 version 1 and 2 for version 3, but we return 1 and 3 """
296 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
297 return libcrypto.ASN1_INTEGER_get(asn1int)+1
300 """ Certificate validity period start date """
301 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
303 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
305 libcrypto.ASN1_TIME_print(b.bio,asn1date)
306 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
309 """ Certificate validity period end date """
310 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
312 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
314 libcrypto.ASN1_TIME_print(b.bio,asn1date)
315 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
316 def extensions(self):
317 """ Returns list of extensions """
318 raise NotImplementedError
320 """ Returns True if certificate is CA certificate """
321 return libcrypto.X509_check_ca(self.cert)>0
324 Represents trusted certificate store. Can be used to lookup CA
325 certificates to verify
327 @param file - file with several certificates and crls
329 @param dir - hashed directory with certificates and crls
330 @param default - if true, default verify location (directory)
334 def __init__(self,file=None,dir=None,default=False):
336 Creates X509 store and installs lookup method. Optionally initializes
337 by certificates from given file or directory.
340 # Todo - set verification flags
342 self.store=libcrypto.X509_STORE_new()
343 if self.store is None:
344 raise X509Error("allocating store")
345 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
347 raise X509Error("error installing file lookup method")
348 if (file is not None):
349 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
350 raise X509Error("error loading trusted certs from file "+file)
351 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
353 raise X509Error("error installing hashed lookup method")
355 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
356 raise X509Error("error adding hashed trusted certs dir "+dir)
358 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
359 raise X509Error("error adding default trusted certs dir ")
360 def add_cert(self,cert):
362 Explicitely adds certificate to set of trusted in the store
363 @param cert - X509 object to add
365 if not isinstance(cert,X509):
366 raise TypeError("cert should be X509")
367 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
368 def add_callback(self,callback):
370 Installs callbac function, which would receive detailed information
371 about verified ceritificates
373 raise NotImplementedError
374 def setflags(self,flags):
376 Set certificate verification flags.
377 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
379 libcrypto.X509_STORE_set_flags(self.store,flags)
380 def setpurpose(self,purpose):
382 Sets certificate purpose which verified certificate should match
383 @param purpose - number from 1 to 9 or standard strind defined in Openssl
384 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
386 if isinstance(purpose,str):
387 purp_no=X509_PURPOSE_get_by_sname(purpose)
389 raise X509Error("Invalid certificate purpose '"+purpose+"'")
390 elif isinstance(purpose,int):
392 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
393 raise X509Error("cannot set purpose")
394 def setdepth(self,depth):
395 libcrypto.X509_STORE_set_depth(self.store,depth)
396 def settime(self, time):
398 Set point in time used to check validity of certificates for
400 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
401 d=int(time.strftime("%s"))
402 elif isinstance(time,int):
405 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
406 raise NotImplementedError
409 Implements OpenSSL STACK_OF(X509) object.
410 It looks much like python container types
412 def __init__(self,certs=None,ptr=None,disposable=True):
415 @param certs - list of X509 objects. If specified, read-write
416 stack is created and populated by these certificates
417 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
419 @param disposable - if True, stack created from object, returned
420 by function is copy, and can be modified and need to be
421 freeid. If false, it is just pointer into another
422 structure i.e. CMS_ContentInfo
425 self.need_free = True
426 self.ptr=libcrypt.sk_new_null()
427 if certs is not None:
430 elif not certs is None:
431 raise ValueError("cannot handle certs an ptr simultaneously")
433 self.need_free = disposable
436 return libcrypto.sk_num(self.ptr)
437 def __getitem__(self,index):
438 if index <0 or index>=len(self):
440 p=libcrypto.sk_value(self.ptr,index)
441 return X509(ptr=libcrypto.X509_dup(p))
442 def __putitem__(self,index,value):
443 if not self.need_free:
444 raise ValueError("Stack is read-only")
445 if index <0 or index>=len(self):
447 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
448 libcrypto.X509_free(p)
449 def __delitem__(self,index):
450 if not self.need_free:
451 raise ValueError("Stack is read-only")
452 if index <0 or index>=len(self):
454 p=libcrypto.sk_delete(self.ptr,index)
455 libcrypto.X509_free(p)
458 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
459 def append(self,value):
460 if not self.need_free:
461 raise ValueError("Stack is read-only")
462 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
463 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
464 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
465 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
466 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
467 libcrypto.ASN1_INTEGER_get.restype=c_long
468 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
469 libcrypto.X509_get_serialNumber.restype=c_void_p
470 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
471 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
472 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
473 libcrypto.X509_NAME_get_entry.restype=c_void_p
474 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
475 libcrypto.X509_STORE_new.restype=c_void_p
476 libcrypto.X509_STORE_add_lookup.restype=c_void_p
477 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
478 libcrypto.X509_LOOKUP_file.restype=c_void_p
479 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
480 libcrypto.X509_LOOKUP_ctrl.restype=c_int
481 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))