]> wagner.pp.ru Git - oss/ctypescrypto.git/blobdiff - ctypescrypto/x509.py
Added X509Name.__hash__
[oss/ctypescrypto.git] / ctypescrypto / x509.py
index e2c97c669d114ddcbcb1e2f2707ecedc7e7c3751..e2ddea88f508f336f822f43ea9ad5c640e6751e7 100644 (file)
@@ -8,15 +8,73 @@ such as CMS, OCSP and timestamps.
 
 
 
 
 
 
-from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
+from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
 from ctypescrypto.bio import Membio
 from ctypescrypto.pkey import PKey
 from ctypescrypto.oid import Oid
 from ctypescrypto.exception import LibCryptoError
 from ctypescrypto import libcrypto
 from ctypescrypto.bio import Membio
 from ctypescrypto.pkey import PKey
 from ctypescrypto.oid import Oid
 from ctypescrypto.exception import LibCryptoError
 from ctypescrypto import libcrypto
+from datetime import datetime
+try:
+       from pytz import utc
+except ImportError:
+       from datetime import timedelta,tzinfo
+       ZERO=timedelta(0)
+       class UTC(tzinfo):
+               """tzinfo object for UTC. 
+                       If no pytz is available, we would use it.
+               """
+
+               def utcoffset(self, dt):
+                       return ZERO
+
+               def tzname(self, dt):
+                       return "UTC"
+
+               def dst(self, dt):
+                       return ZERO
+
+       utc=UTC()
 
 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
 
 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
-# X509_extlist is not exported yet, because is not implemented 
+
+class _validity(Structure):
+       """ ctypes representation of X509_VAL structure 
+               needed to access certificate validity period, because openssl
+               doesn't provide fuctions for it - only macros
+       """
+       _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
+
+class _cinf(Structure):
+       """ ctypes representtion of X509_CINF structure 
+           neede to access certificate data, which are accessable only
+               via macros
+       """
+       _fields_ = [('version',c_void_p),
+               ('serialNumber',c_void_p),
+               ('sign_alg',c_void_p),
+               ('issuer',c_void_p),
+               ('validity',POINTER(_validity)),
+               ('subject',c_void_p),
+               ('pubkey',c_void_p),
+               ('issuerUID',c_void_p),
+               ('subjectUID',c_void_p),
+               ('extensions',c_void_p),
+               ]
+
+class _x509(Structure):
+       """
+       ctypes represntation of X509 structure needed
+       to access certificate data which are accesable only via
+       macros, not functions
+       """
+       _fields_ = [('cert_info',POINTER(_cinf)),
+                               ('sig_alg',c_void_p),
+                               ('signature',c_void_p),
+                               # There are a lot of parsed extension fields there
+                               ]
+_px509 = POINTER(_x509)
+
 class X509Error(LibCryptoError):
        """
        Exception, generated when some openssl function fail
 class X509Error(LibCryptoError):
        """
        Exception, generated when some openssl function fail
@@ -25,7 +83,7 @@ class X509Error(LibCryptoError):
        pass
 
 
        pass
 
 
-class X509Name:
+class X509Name(object):
        """
        Class which represents X.509 distinguished name - typically 
        a certificate subject name or an issuer name.
        """
        Class which represents X.509 distinguished name - typically 
        a certificate subject name or an issuer name.
@@ -90,52 +148,119 @@ class X509Name:
                        # Return first matching field
                        idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
                        if idx<0:
                        # Return first matching field
                        idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
                        if idx<0:
-                               raise KeyError("Key not found "+repr(Oid))
+                               raise KeyError("Key not found "+str(Oid))
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return unicode(b)
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return unicode(b)
-               elif isinstance(key,int):
+               elif isinstance(key,(int,long)):
                        # Return OID, string tuple
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
                        if entry is None:
                                raise IndexError("name entry index out of range")
                        # Return OID, string tuple
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
                        if entry is None:
                                raise IndexError("name entry index out of range")
-                       obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
-                       nid=libcrypto.OBJ_obj2nid(obj)
-                       if nid==0:
-                               buf=create_string_buffer(80)
-                               len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
-                               oid=Oid(buf[0:len])
-                       else:
-                               oid=Oid(nid)
+                       oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return (oid,unicode(b))
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return (oid,unicode(b))
+               else:
+                       raise TypeError("X509 NAME can be indexed by Oids or integers only")
 
        def __setitem__(self,key,val):
                if not self.writable:
                        raise ValueError("Attempt to modify constant X509 object")
 
        def __setitem__(self,key,val):
                if not self.writable:
                        raise ValueError("Attempt to modify constant X509 object")
-class X509_extlist:
-       def __init__(self,ptr):
-               self.ptr=ptr
+               else:
+                       raise NotImplementedError
+       def __delitem__(self,key):
+               if not self.writable:
+                       raise ValueError("Attempt to modify constant X509 object")
+               else:
+                       raise NotImplementedError
+       def __hash__(self):
+               return libcrypto.X509_NAME_hash(self.ptr)
+
+class _x509_ext(Structure):
+       """ Represens C structure X509_EXTENSION """
+       _fields_=[("object",c_void_p),
+                       ("critical",c_int),
+                       ("value",c_void_p)]
+
+class X509_EXT(object):
+       """ Python object which represents a certificate extension """
+       def __init__(self,ptr,copy=False):
+               """ Initializes from the pointer to X509_EXTENSION.
+                       If copy is True, creates a copy, otherwise just
+                       stores pointer.
+               """
+               if copy:
+                       self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
+               else:
+                       self.ptr=cast(ptr,POINTER(_x509_ext))
        def __del__(self):
        def __del__(self):
-               libcrypto.X509_NAME_free(self.ptr)
+               libcrypto.X509_EXTENSION_free(self.ptr)
        def __str__(self):
        def __str__(self):
-               raise NotImplementedError
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               return str(b)
+       def __unicode__(self):
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               return unicode(b)
+       @property
+       def oid(self):
+               return Oid.fromobj(self.ptr[0].object)
+       @property
+       def critical(self):     
+               return self.ptr[0].critical >0
+class _X509extlist(object):    
+       """
+       Represents list of certificate extensions
+       """
+       def __init__(self,cert):
+               self.cert=cert
        def __len__(self):
        def __len__(self):
-               return libcrypto.X509_NAME_entry_count(self.ptr)
-
-       def __getattr__(self,key):
-               raise NotImplementedError
-       def __setattr__(self,key,val):
-               raise NotImplementedError
-
-       
-
+               return libcrypto.X509_get_ext_count(self.cert.cert)
+       def __getitem__(self,item):
+               p=libcrypto.X509_get_ext(self.cert.cert,item)
+               if p is None:
+                       raise IndexError
+               return X509_EXT(p,True)
+       def find(self,oid):
+               """
+               Return list of extensions with given Oid
+               """
+               if not isinstance(oid,Oid):
+                       raise TypeError("Need crytypescrypto.oid.Oid as argument")
+               found=[]
+               l=-1
+               end=len(self)
+               while True:
+                       l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found
+       def find_critical(self,crit=True):
+               """
+               Return list of critical extensions (or list of non-cricital, if
+               optional second argument is False
+               """
+               if crit:
+                       flag=1
+               else:
+                       flag=0
+               found=[]
+               end=len(self)
+               l=-1
+               while True:
+                       l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found                    
 
 
-class X509:
+class X509(object):
        """
        Represents X.509 certificate. 
        """
        """
        Represents X.509 certificate. 
        """
@@ -161,6 +286,7 @@ class X509:
                                self.cert=libcrypto.d2i_X509_bio(b.bio,None)
                        if self.cert is None:
                                raise X509Error("error reading certificate")
                                self.cert=libcrypto.d2i_X509_bio(b.bio,None)
                        if self.cert is None:
                                raise X509Error("error reading certificate")
+               self.extensions=_X509extlist(self)              
        def __del__(self):
                """
                Frees certificate object
        def __del__(self):
                """
                Frees certificate object
@@ -233,23 +359,34 @@ class X509:
                b=Membio()
                libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
                return int(str(b),16)
                b=Membio()
                libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
                return int(str(b),16)
+       @property 
+       def version(self):
+               """ certificate version as integer. Really certificate stores 0 for
+               version 1 and 2 for version 3, but we return 1 and 3 """
+               asn1int=cast(self.cert,_px509)[0].cert_info[0].version
+               return libcrypto.ASN1_INTEGER_get(asn1int)+1
        @property
        def startDate(self):
                """ Certificate validity period start date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
        @property
        def startDate(self):
                """ Certificate validity period start date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
-               raise NotImplementedError
+               global utc
+               asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
+               b=Membio()
+               libcrypto.ASN1_TIME_print(b.bio,asn1date)
+               return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
        @property
        def endDate(self):
                """ Certificate validity period end date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
        @property
        def endDate(self):
                """ Certificate validity period end date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
-               raise NotImplementedError
-       def extensions(self):
-               """ Returns list of extensions """
-               raise NotImplementedError
+               global utc
+               asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
+               b=Membio()
+               libcrypto.ASN1_TIME_print(b.bio,asn1date)
+               return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
        def check_ca(self):
                """ Returns True if certificate is CA certificate """
                return libcrypto.X509_check_ca(self.cert)>0
        def check_ca(self):
                """ Returns True if certificate is CA certificate """
                return libcrypto.X509_check_ca(self.cert)>0
-class X509Store:
+class X509Store(object):
        """
                Represents trusted certificate store. Can be used to lookup CA 
                certificates to verify
        """
                Represents trusted certificate store. Can be used to lookup CA 
                certificates to verify
@@ -297,7 +434,7 @@ class X509Store:
                libcrypto.X509_STORE_add_cert(self.store,cert.cert)
        def add_callback(self,callback):
                """
                libcrypto.X509_STORE_add_cert(self.store,cert.cert)
        def add_callback(self,callback):
                """
-               Installs callbac function, which would receive detailed information
+               Installs callback function, which would receive detailed information
                about verified ceritificates
                """
                raise NotImplementedError
                about verified ceritificates
                """
                raise NotImplementedError
@@ -322,10 +459,16 @@ class X509Store:
                if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
                        raise X509Error("cannot set purpose")
        def setdepth(self,depth):
                if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
                        raise X509Error("cannot set purpose")
        def setdepth(self,depth):
+               """
+               Sets the verification depth i.e. max length of certificate chain
+               which is acceptable
+               """
                libcrypto.X509_STORE_set_depth(self.store,depth)
        def settime(self, time):
                """
                Set point in time used to check validity of certificates for
                libcrypto.X509_STORE_set_depth(self.store,depth)
        def settime(self, time):
                """
                Set point in time used to check validity of certificates for
+               Time can be either python datetime object or number of seconds
+               sinse epoch
                """
                if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
                        d=int(time.strftime("%s"))
                """
                if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
                        d=int(time.strftime("%s"))
@@ -334,7 +477,7 @@ class X509Store:
                else:
                        raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
                raise NotImplementedError
                else:
                        raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
                raise NotImplementedError
-class StackOfX509:
+class StackOfX509(object):
        """
        Implements OpenSSL STACK_OF(X509) object.
        It looks much like python container types
        """
        Implements OpenSSL STACK_OF(X509) object.
        It looks much like python container types
@@ -353,11 +496,11 @@ class StackOfX509:
                """
                if  ptr is None:
                        self.need_free = True
                """
                if  ptr is None:
                        self.need_free = True
-                       self.ptr=libcrypt.sk_new_null()
+                       self.ptr=libcrypto.sk_new_null()
                        if certs is not None:
                                for crt in certs:
                                        self.append(crt)
                        if certs is not None:
                                for crt in certs:
                                        self.append(crt)
-               elif not certs is None:
+               elif certs is not None:
                                raise ValueError("cannot handle certs an ptr simultaneously")
                else:
                        self.need_free = disposable
                                raise ValueError("cannot handle certs an ptr simultaneously")
                else:
                        self.need_free = disposable
@@ -369,12 +512,15 @@ class StackOfX509:
                        raise IndexError
                p=libcrypto.sk_value(self.ptr,index)
                return X509(ptr=libcrypto.X509_dup(p))
                        raise IndexError
                p=libcrypto.sk_value(self.ptr,index)
                return X509(ptr=libcrypto.X509_dup(p))
-       def __putitem__(self,index,value):
+       def __setitem__(self,index,value):
                if not self.need_free:
                        raise ValueError("Stack is read-only")
                if index <0 or index>=len(self):
                        raise IndexError
                if not self.need_free:
                        raise ValueError("Stack is read-only")
                if index <0 or index>=len(self):
                        raise IndexError
-               p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+               if not isinstance(value,X509):
+                       raise TypeError('StackOfX508 can contain only X509 objects')
+               p=libcrypto.sk_value(self.ptr,index)
+               libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
                libcrypto.X509_free(p)
        def __delitem__(self,index):    
                if not self.need_free:
                libcrypto.X509_free(p)
        def __delitem__(self,index):    
                if not self.need_free:
@@ -389,9 +535,14 @@ class StackOfX509:
        def append(self,value):
                if not self.need_free:
                        raise ValueError("Stack is read-only")
        def append(self,value):
                if not self.need_free:
                        raise ValueError("Stack is read-only")
+               if not isinstance(value,X509):
+                       raise TypeError('StackOfX508 can contain only X509 objects')
                libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
                libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
+libcrypto.ASN1_INTEGER_get.restype=c_long
 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
 libcrypto.X509_get_serialNumber.restype=c_void_p
 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
 libcrypto.X509_get_serialNumber.restype=c_void_p
 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
@@ -406,3 +557,18 @@ libcrypto.X509_LOOKUP_file.restype=c_void_p
 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
 libcrypto.X509_LOOKUP_ctrl.restype=c_int
 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
 libcrypto.X509_LOOKUP_ctrl.restype=c_int
 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
+libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
+libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.X509_get_ext.restype=c_void_p
+libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
+libcrypto.sk_set.restype=c_void_p
+libcrypto.sk_value.argtypes=(c_void_p,c_int)
+libcrypto.sk_value.restype=c_void_p
+libcrypto.X509_dup.restype=c_void_p
+libcrypto.sk_new_null.restype=c_void_p
+libcrypto.X509_dup.argtypes=(c_void_p,)
+libcrypto.X509_NAME_hash.restype=c_long
+libcrypto.X509_NAME_hash.argtypes=(c_void_p,)