try:
from pytz import utc
except ImportError:
- from datetime import timedelta
+ from datetime import timedelta,tzinfo
ZERO=timedelta(0)
- class UTC(datetime.tzinfo):
+ class UTC(tzinfo):
"""tzinfo object for UTC.
If no pytz is available, we would use it.
"""
utc=UTC()
-__all__ = ['X509Error','X509Name','X509Store','StackOfX509']
+__all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
class _validity(Structure):
""" ctypes representation of X509_VAL structure
('validity',POINTER(_validity)),
('subject',c_void_p),
('pubkey',c_void_p),
+ ('issuerUID',c_void_p),
+ ('subjectUID',c_void_p),
+ ('extensions',c_void_p),
]
class _x509(Structure):
]
_px509 = POINTER(_x509)
-# X509_extlist is not exported yet, because is not implemented
class X509Error(LibCryptoError):
"""
Exception, generated when some openssl function fail
pass
-class X509Name:
+class X509Name(object):
"""
Class which represents X.509 distinguished name - typically
a certificate subject name or an issuer name.
# Return first matching field
idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
if idx<0:
- raise KeyError("Key not found "+repr(Oid))
+ raise KeyError("Key not found "+str(Oid))
entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
s=libcrypto.X509_NAME_ENTRY_get_data(entry)
b=Membio()
libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
return unicode(b)
- elif isinstance(key,int):
+ elif isinstance(key,(int,long)):
# Return OID, string tuple
entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
if entry is None:
raise IndexError("name entry index out of range")
- obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
- nid=libcrypto.OBJ_obj2nid(obj)
- if nid==0:
- buf=create_string_buffer(80)
- len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
- oid=Oid(buf[0:len])
- else:
- oid=Oid(nid)
+ oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
s=libcrypto.X509_NAME_ENTRY_get_data(entry)
b=Membio()
libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
return (oid,unicode(b))
+ else:
+ raise TypeError("X509 NAME can be indexed by Oids or integers only")
def __setitem__(self,key,val):
if not self.writable:
raise ValueError("Attempt to modify constant X509 object")
-class X509_extlist:
- def __init__(self,ptr):
- self.ptr=ptr
+ else:
+ raise NotImplementedError
+ def __delitem__(self,key):
+ if not self.writable:
+ raise ValueError("Attempt to modify constant X509 object")
+ else:
+ raise NotImplementedError
+ def __hash__(self):
+ return libcrypto.X509_NAME_hash(self.ptr)
+
+class _x509_ext(Structure):
+ """ Represens C structure X509_EXTENSION """
+ _fields_=[("object",c_void_p),
+ ("critical",c_int),
+ ("value",c_void_p)]
+
+class X509_EXT(object):
+ """ Python object which represents a certificate extension """
+ def __init__(self,ptr,copy=False):
+ """ Initializes from the pointer to X509_EXTENSION.
+ If copy is True, creates a copy, otherwise just
+ stores pointer.
+ """
+ if copy:
+ self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
+ else:
+ self.ptr=cast(ptr,POINTER(_x509_ext))
def __del__(self):
- libcrypto.X509_NAME_free(self.ptr)
+ libcrypto.X509_EXTENSION_free(self.ptr)
def __str__(self):
- raise NotImplementedError
+ b=Membio()
+ libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+ return str(b)
+ def __unicode__(self):
+ b=Membio()
+ libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+ return unicode(b)
+ @property
+ def oid(self):
+ return Oid.fromobj(self.ptr[0].object)
+ @property
+ def critical(self):
+ return self.ptr[0].critical >0
+class _X509extlist(object):
+ """
+ Represents list of certificate extensions
+ """
+ def __init__(self,cert):
+ self.cert=cert
def __len__(self):
- return libcrypto.X509_NAME_entry_count(self.ptr)
-
- def __getattr__(self,key):
- raise NotImplementedError
- def __setattr__(self,key,val):
- raise NotImplementedError
-
-
-
+ return libcrypto.X509_get_ext_count(self.cert.cert)
+ def __getitem__(self,item):
+ p=libcrypto.X509_get_ext(self.cert.cert,item)
+ if p is None:
+ raise IndexError
+ return X509_EXT(p,True)
+ def find(self,oid):
+ """
+ Return list of extensions with given Oid
+ """
+ if not isinstance(oid,Oid):
+ raise TypeError("Need crytypescrypto.oid.Oid as argument")
+ found=[]
+ l=-1
+ end=len(self)
+ while True:
+ l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
+ if l>=end or l<0:
+ break
+ found.append(self[l])
+ return found
+ def find_critical(self,crit=True):
+ """
+ Return list of critical extensions (or list of non-cricital, if
+ optional second argument is False
+ """
+ if crit:
+ flag=1
+ else:
+ flag=0
+ found=[]
+ end=len(self)
+ l=-1
+ while True:
+ l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
+ if l>=end or l<0:
+ break
+ found.append(self[l])
+ return found
-class X509:
+class X509(object):
"""
Represents X.509 certificate.
"""
self.cert=libcrypto.d2i_X509_bio(b.bio,None)
if self.cert is None:
raise X509Error("error reading certificate")
+ self.extensions=_X509extlist(self)
def __del__(self):
"""
Frees certificate object
def pubkey(self):
"""EVP PKEy object of certificate public key"""
return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
+ def pem(self):
+ """ Returns PEM represntation of the certificate """
+ b=Membio()
+ if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
+ raise X509Error("error serializing certificate")
+ return str(b)
def verify(self,store=None,chain=[],key=None):
"""
Verify self. Supports verification on both X509 store object
b=Membio()
libcrypto.ASN1_TIME_print(b.bio,asn1date)
return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
- def extensions(self):
- """ Returns list of extensions """
- raise NotImplementedError
def check_ca(self):
""" Returns True if certificate is CA certificate """
return libcrypto.X509_check_ca(self.cert)>0
-class X509Store:
+class X509Store(object):
"""
Represents trusted certificate store. Can be used to lookup CA
certificates to verify
libcrypto.X509_STORE_add_cert(self.store,cert.cert)
def add_callback(self,callback):
"""
- Installs callbac function, which would receive detailed information
+ Installs callback function, which would receive detailed information
about verified ceritificates
"""
raise NotImplementedError
if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
raise X509Error("cannot set purpose")
def setdepth(self,depth):
+ """
+ Sets the verification depth i.e. max length of certificate chain
+ which is acceptable
+ """
libcrypto.X509_STORE_set_depth(self.store,depth)
def settime(self, time):
"""
Set point in time used to check validity of certificates for
+ Time can be either python datetime object or number of seconds
+ sinse epoch
"""
if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
d=int(time.strftime("%s"))
else:
raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
raise NotImplementedError
-class StackOfX509:
+class StackOfX509(object):
"""
Implements OpenSSL STACK_OF(X509) object.
It looks much like python container types
"""
if ptr is None:
self.need_free = True
- self.ptr=libcrypt.sk_new_null()
+ self.ptr=libcrypto.sk_new_null()
if certs is not None:
for crt in certs:
self.append(crt)
- elif not certs is None:
+ elif certs is not None:
raise ValueError("cannot handle certs an ptr simultaneously")
else:
self.need_free = disposable
raise IndexError
p=libcrypto.sk_value(self.ptr,index)
return X509(ptr=libcrypto.X509_dup(p))
- def __putitem__(self,index,value):
+ def __setitem__(self,index,value):
if not self.need_free:
raise ValueError("Stack is read-only")
if index <0 or index>=len(self):
raise IndexError
- p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+ if not isinstance(value,X509):
+ raise TypeError('StackOfX508 can contain only X509 objects')
+ p=libcrypto.sk_value(self.ptr,index)
+ libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
libcrypto.X509_free(p)
def __delitem__(self,index):
if not self.need_free:
def append(self,value):
if not self.need_free:
raise ValueError("Stack is read-only")
+ if not isinstance(value,X509):
+ raise TypeError('StackOfX508 can contain only X509 objects')
libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.PEM_read_bio_X509.restype=c_void_p
+libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
+libcrypto.PEM_write_bio_X509.restype=c_int
+libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
libcrypto.ASN1_INTEGER_get.restype=c_long
libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
libcrypto.X509_LOOKUP_ctrl.restype=c_int
libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
-
+libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
+libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.X509_get_ext.restype=c_void_p
+libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
+libcrypto.sk_set.restype=c_void_p
+libcrypto.sk_value.argtypes=(c_void_p,c_int)
+libcrypto.sk_value.restype=c_void_p
+libcrypto.X509_dup.restype=c_void_p
+libcrypto.sk_new_null.restype=c_void_p
+libcrypto.X509_dup.argtypes=(c_void_p,)
+libcrypto.X509_NAME_hash.restype=c_long
+libcrypto.X509_NAME_hash.argtypes=(c_void_p,)