]> wagner.pp.ru Git - oss/ctypescrypto.git/commitdiff
Implemented minimal cert extension support
authorVictor Wagner <vitus@wagner.pp.ru>
Sat, 20 Dec 2014 11:58:54 +0000 (14:58 +0300)
committerVictor Wagner <vitus@wagner.pp.ru>
Sat, 20 Dec 2014 11:58:54 +0000 (14:58 +0300)
ctypescrypto/oid.py
ctypescrypto/x509.py
tests/testx509.py

index 7d4fc21439206c0d196239755f6a2874ea9c9a4c..6941ce4b399b6b353026dfa99aac02238b656e5e 100644 (file)
@@ -12,7 +12,7 @@ from ctypes import c_char_p, c_void_p, c_int, create_string_buffer
 
 __all__ = ['Oid','create','cleanup']
 
-class Oid:
+class Oid(object):
        """
                Represents an OID. It can be consturucted by textual
                representation like Oid("commonName") or Oid("CN"),
@@ -58,11 +58,25 @@ class Oid:
                " Returns logn name if any "
                return  libcrypto.OBJ_nid2ln(self.nid)
        def dotted(self):
-               " Returns dotted-decimal reperesntation "
+               " Returns dotted-decimal reperesentation "
                obj=libcrypto.OBJ_nid2obj(self.nid)
                buf=create_string_buffer(256)
                libcrypto.OBJ_obj2txt(buf,256,obj,1)
                return buf.value
+       @staticmethod
+       def fromobj(obj):
+               """
+               Creates an OID object from the pointer to ASN1_OBJECT c structure.
+               Strictly for internal use
+               """
+               nid=libcrypto.OBJ_obj2nid(obj)
+               if nid==0:
+                       buf=create_string_buffer(80)
+                       l=libcrypto.OBJ_obj2txt(buf,80,obj,1)
+                       oid=create(buf[0:l],buf[0:l],buf[0:l])
+               else:
+                       oid=Oid(nid)
+               return oid
 
 def create(dotted,shortname,longname):
        """
index bd056d32f6227bcba4593e26fd79d06504ee5097..358ce7d81d7d244ca1d44b2dfc34769e2c1e0f9d 100644 (file)
@@ -57,6 +57,9 @@ class _cinf(Structure):
                ('validity',POINTER(_validity)),
                ('subject',c_void_p),
                ('pubkey',c_void_p),
+               ('issuerUID',c_void_p),
+               ('subjectUID',c_void_p),
+               ('extensions',c_void_p),
                ]
 
 class _x509(Structure):
@@ -72,7 +75,6 @@ class _x509(Structure):
                                ]
 _px509 = POINTER(_x509)
 
-# X509_extlist is not exported yet, because is not implemented 
 class X509Error(LibCryptoError):
        """
        Exception, generated when some openssl function fail
@@ -157,14 +159,7 @@ class X509Name:
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
                        if entry is None:
                                raise IndexError("name entry index out of range")
-                       obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
-                       nid=libcrypto.OBJ_obj2nid(obj)
-                       if nid==0:
-                               buf=create_string_buffer(80)
-                               len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
-                               oid=Oid(buf[0:len])
-                       else:
-                               oid=Oid(nid)
+                       oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
@@ -173,25 +168,89 @@ class X509Name:
        def __setitem__(self,key,val):
                if not self.writable:
                        raise ValueError("Attempt to modify constant X509 object")
-class X509_extlist:
-       def __init__(self,ptr):
-               self.ptr=ptr
+
+class _x509_ext(Structure):
+       """ Represens C structure X509_EXTENSION """
+       _fields_=[("object",c_void_p),
+                       ("critical",c_int),
+                       ("value",c_void_p)]
+
+class X509_EXT(object):
+       """ Python object which represents a certificate extension """
+       def __init__(self,ptr,copy=False):
+               """ Initializes from the pointer to X509_EXTENSION.
+                       If copy is True, creates a copy, otherwise just
+                       stores pointer.
+               """
+               if copy:
+                       self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
+               else:
+                       self.ptr=cast(ptr,POINTER(_x509_ext))
        def __del__(self):
-               libcrypto.X509_NAME_free(self.ptr)
+               libcrypto.X509_EXTENSION_free(self.ptr)
        def __str__(self):
-               raise NotImplementedError
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+               return str(b)
+       def __unicode__(self):
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               return unicode(b)
+       @property
+       def oid(self):
+               return Oid.fromobj(self.ptr[0].object)
+       @property
+       def critical(self):     
+               return self.ptr[0].critical >0
+class _X509extlist(object):    
+       """
+       Represents list of certificate extensions
+       """
+       def __init__(self,cert):
+               self.cert=cert
        def __len__(self):
-               return libcrypto.X509_NAME_entry_count(self.ptr)
-
-       def __getattr__(self,key):
-               raise NotImplementedError
-       def __setattr__(self,key,val):
-               raise NotImplementedError
-
-       
-
+               return libcrypto.X509_get_ext_count(self.cert.cert)
+       def __getitem__(self,item):
+               p=libcrypto.X509_get_ext(self.cert.cert,item)
+               if p is None:
+                       raise IndexError
+               return X509_EXT(p,True)
+       def find(self,oid):
+               """
+               Return list of extensions with given Oid
+               """
+               if not isinstance(oid,Oid):
+                       raise TypeError("Need crytypescrypto.oid.Oid as argument")
+               found=[]
+               l=-1
+               end=len(self)
+               while True:
+                       l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found
+       def find_critical(self,crit=True):
+               """
+               Return list of critical extensions (or list of non-cricital, if
+               optional second argument is False
+               """
+               if crit:
+                       flag=1
+               else:
+                       flag=0
+               found=[]
+               end=len(self)
+               l=-1
+               while True:
+                       l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found                    
 
-class X509:
+class X509(object):
        """
        Represents X.509 certificate. 
        """
@@ -217,6 +276,7 @@ class X509:
                                self.cert=libcrypto.d2i_X509_bio(b.bio,None)
                        if self.cert is None:
                                raise X509Error("error reading certificate")
+               self.extensions=_X509extlist(self)              
        def __del__(self):
                """
                Frees certificate object
@@ -313,9 +373,6 @@ class X509:
                b=Membio()
                libcrypto.ASN1_TIME_print(b.bio,asn1date)
                return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
-       def extensions(self):
-               """ Returns list of extensions """
-               raise NotImplementedError
        def check_ca(self):
                """ Returns True if certificate is CA certificate """
                return libcrypto.X509_check_ca(self.cert)>0
@@ -367,7 +424,7 @@ class X509Store:
                libcrypto.X509_STORE_add_cert(self.store,cert.cert)
        def add_callback(self,callback):
                """
-               Installs callbac function, which would receive detailed information
+               Installs callback function, which would receive detailed information
                about verified ceritificates
                """
                raise NotImplementedError
@@ -392,10 +449,16 @@ class X509Store:
                if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
                        raise X509Error("cannot set purpose")
        def setdepth(self,depth):
+               """
+               Sets the verification depth i.e. max length of certificate chain
+               which is acceptable
+               """
                libcrypto.X509_STORE_set_depth(self.store,depth)
        def settime(self, time):
                """
                Set point in time used to check validity of certificates for
+               Time can be either python datetime object or number of seconds
+               sinse epoch
                """
                if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
                        d=int(time.strftime("%s"))
@@ -479,4 +542,8 @@ libcrypto.X509_LOOKUP_file.restype=c_void_p
 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
 libcrypto.X509_LOOKUP_ctrl.restype=c_int
 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
-
+libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
+libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.X509_get_ext.restype=c_void_p
+libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
index 011d487e770c78e9dee3d0d74f64cf3880e38ced..a54417ba5733cb0e966f92bc7c196f8b624c2c0b 100644 (file)
@@ -148,6 +148,40 @@ zVMSW4SOwg/H7ZMZ2cn6j1g0djIvruFQFGHUqFijyDATI+/GJYw2jxyA
                self.assertTrue(ca.check_ca())
                notca=X509(self.cert1)
                self.assertFalse(notca.check_ca())
+       def test_extension_count(self):
+               cert=X509(self.cert1)
+               self.assertTrue(len(cert.extensions),4)
+               ca_cert=X509(self.ca_cert)
+               self.assertEqual(len(ca_cert.extensions),3)
+       def test_extension_outofrange(self):
+               cert=X509(self.cert1)
+               with self.assertRaises(IndexError):
+                       cert.extensions[4]
+               with self.assertRaises(IndexError):
+                       cert.extensions[-1]
+       def test_extension_oid(self):
+               cert=X509(self.cert1)
+               ext=cert.extensions[0]
+               ext_id=ext.oid
+               self.assertTrue(isinstance(ext_id,Oid))
+               self.assertEqual(ext_id,Oid('basicConstraints'))
+       def text_extension_text(self):
+               cert=X509(self.cert1)
+               ext=cert.extensions[0]
+               self.assertEqual(str(ext),'CA:FALSE')
+       def test_extenson_find(self):
+               cert=X509(self.cert1)
+               exts=cert.extensions.find(Oid('subjectAltName'))
+               self.assertEqual(len(exts),1)
+               self.assertEqual(exts[0].oid,Oid('subjectAltName'))
+       def test_extenson_critical(self):
+               cert=X509(self.digicert_cert)
+               crit_exts=cert.extensions.find_critical()
+               self.assertEqual(len(crit_exts),2)
+               other_exts=cert.extensions.find_critical(False)
+               self.assertEqual(len(crit_exts)+len(other_exts),len(cert.extensions))
+               self.assertEqual(crit_exts[0].critical,True)
+               self.assertEqual(other_exts[0].critical,False)
        def test_verify_by_key(self):
                ca=X509(self.ca_cert)
                pubkey=ca.pubkey
@@ -181,7 +215,6 @@ zVMSW4SOwg/H7ZMZ2cn6j1g0djIvruFQFGHUqFijyDATI+/GJYw2jxyA
                # signed by some commercial CA should be rejected too
                self.assertFalse(gitcert.verify(store))
                trusted.close()
-               pass
        def test_verify_by_dirstore(self):
                pass
 if __name__ == '__main__':