class SignedData(CMSBase):
@staticmethod
- def create(data,cert,pkey,flags=Flags.BINARY):
+ def create(data,cert,pkey,flags=Flags.BINARY,certs=[]):
"""
Creates SignedData message by signing data with pkey and
certificate.
@param data - data to sign
@param pkey - pkey object with private key to sign
+ @param flags - OReed combination of Flags constants
+ @param certs - list of X509 objects to include into CMS
"""
if not pkey.cansign:
raise ValueError("Specified keypair has no private part")
if cert.pubkey!=pkey:
raise ValueError("Certificate doesn't match public key")
b=Membio(data)
- ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,None,b.bio,flags)
+ if certs is not None and len(certs)>0:
+ certstack=StackOfX509(certs)
+ else:
+ certstack=None
+ ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags)
if ptr is None:
raise CMSError("signing message")
return SignedData(ptr)
res= libcrypto.CMS_final(self.ptr,biodata,None,flags)
if res<=0:
raise CMSError
- def verify(self,store,flags,data=None):
+ def verify(self,store,flags,data=None,certs=[]):
"""
Verifies signature under CMS message using trusted cert store
@param store - X509Store object with trusted certs
@param flags - OR-ed combination of flag consants
@param data - message data, if messge has detached signature
+ param certs - list of certificates to use during verification
+ If Flags.NOINTERN is specified, these are only
+ sertificates to search for signing certificates
@returns True if signature valid, False otherwise
"""
bio=None
if data!=None:
b=Membio(data)
bio=b.bio
- res=libcrypto.CMS_verify(self.ptr,None,store.store,bio,None,flags)
+ if certs is not None and len(certs)>0:
+ certstack=StackOfX509(certs)
+ else:
+ certstack=None
+ res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags)
return res>0
@property
def signers(self,store=None):
"""
Return list of signer's certificates
"""
- raise NotImplementedError
+ p=libcrypto.CMS_get0_signers(self.ptr)
+ if p is None:
+ raise CMSError
+ return StackOfX509(ptr=p,disposable=False)
@property
def data(self):
"""
Adds a certificate (probably intermediate CA) to the SignedData
structure
"""
- raise NotImplementedError
+ if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0:
+ raise CMSError("adding cert")
def addcrl(self,crl):
"""
Adds a CRL to the signed data structure
"""
List of the certificates contained in the structure
"""
- raise NotImplementedError
+ p=CMS_get1_certs(self.ptr)
+ if p is None:
+ raise CMSError("getting certs")
+ return StackOfX509(ptr=p,disposable=True)
@property
def crls(self):
"""
@param cipher - CipherType object
@param flags - flag
"""
- # Need to be able to handle OPENSSL stacks
- raise NotImplementedError
+ recp=StackOfX509(recipients)
+ b=Membio(data)
+ p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags)
+ if p is None:
+ raise CMSError("encrypt EnvelopedData")
+ return EnvelopedData(p)
def decrypt(self,pkey,cert,flags=0):
"""
Decrypts message
def pubkey(self):
"""EVP PKEy object of certificate public key"""
return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
- def verify(self,store=None,key=None):
+ def verify(self,store=None,chain=[],key=None):
"""
Verify self. Supports verification on both X509 store object
or just public issuer key
@param store X509Store object.
+ @param chain - list of X509 objects to add into verification
+ context.These objects are untrusted, but can be used to
+ build certificate chain up to trusted object in the store
@param key - PKey object
- parameters are mutually exclusive. If neither is specified, attempts to verify
+ parameters stora and key are mutually exclusive. If neither is specified, attempts to verify
+
itself as self-signed certificate
"""
if store is not None and key is not None:
ctx=libcrypto.X509_STORE_CTX_new()
if ctx is None:
raise X509Error("Error allocating X509_STORE_CTX")
- if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,None) < 0:
+ if chain is not None and len(chain)>0:
+ ch=StackOfX509(chain)
+ else:
+ ch=None
+ if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
raise X509Error("Error allocating X509_STORE_CTX")
res= libcrypto.X509_verify_cert(ctx)
libcrypto.X509_STORE_CTX_free(ctx)
purp_no = purpose
if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
raise X509Error("cannot set purpose")
+ def setdepth(self,depth):
+ libcrypto.X509_STORE_set_depth(self.store,depth)
+ def settime(self, time):
+ """
+ Set point in time used to check validity of certificates for
+ """
+ if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
+ d=int(time.strftime("%s"))
+ elif isinstance(time,int):
+ pass
+ else:
+ raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
+ raise NotImplementedError
+class StackOfX509:
+ """
+ Implements OpenSSL STACK_OF(X509) object.
+ It looks much like python container types
+ """
+ def __init__(self,certs=None,ptr=None,disposable=True):
+ """
+ Create stack
+ @param certs - list of X509 objects. If specified, read-write
+ stack is created and populated by these certificates
+ @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
+ some functions
+ @param disposable - if True, stack created from object, returned
+ by function is copy, and can be modified and need to be
+ freeid. If false, it is just pointer into another
+ structure i.e. CMS_ContentInfo
+ """
+ if ptr is None:
+ self.need_free = True
+ self.ptr=libcrypt.sk_new_null()
+ if certs is not None:
+ for crt in certs:
+ self.append(crt)
+ elif not certs is None:
+ raise ValueError("cannot handle certs an ptr simultaneously")
+ else:
+ self.need_free = disposable
+ self.ptr=ptr
+ def __len__(self):
+ return libcrypto.sk_num(self.ptr)
+ def __getitem__(self,index):
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_value(self.ptr,index)
+ return X509(ptr=libcrypto.X509_dup(p))
+ def __putitem__(self,index,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+ libcrypto.X509_free(p)
+ def __delitem__(self,index):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_delete(self.ptr,index)
+ libcrypto.X509_free(p)
+ def __del__(self):
+ if self.need_free:
+ libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
+ def append(self,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)