X-Git-Url: http://wagner.pp.ru/gitweb/?a=blobdiff_plain;f=ctypescrypto%2Fcms.py;h=2c038985aba8ae9488f37083115dc5dd69bb4e6b;hb=3af16f1cbc516e55cfc31af5dee0bc367a9ecbae;hp=9defb1d5b5c1403b1b6c16e6f4b9d8ebc76f71b4;hpb=d817f7ee1103370ab5355871e744dfb5c15bf2b4;p=oss%2Fctypescrypto.git diff --git a/ctypescrypto/cms.py b/ctypescrypto/cms.py index 9defb1d..2c03898 100644 --- a/ctypescrypto/cms.py +++ b/ctypescrypto/cms.py @@ -10,8 +10,9 @@ create it from raw data and neccessary certificates. """ - +from ctypes import c_int, c_void_p, c_char_p, c_int from ctypescrypto.exception import LibCryptoError +from ctypescrypto import libcrypto from ctypescrypto.bio import Membio from ctypescrypto.oid import Oid @@ -62,30 +63,60 @@ def CMS(data,format="PEM"): return SignedData(ptr) elif typeoid.shortname()=="pkcs7-envelopedData": return EnvelopedData(ptr) + elif typeoid.shortname()=="pkcs7-encryptedData": + return EncryptedData(ptr) else: raise NotImplementedError("cannot handle "+typeoid.shortname()) +class CMSBase: + """ + Common ancessor for all CMS types. + Implements serializatio/deserialization + """ + def __init__(self,ptr=None): + self.ptr=ptr + def __str__(self): + """ + Serialize in DER format + """ + b=Membio() + if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): + raise CMSError("writing CMS to PEM") + return str(b) + def pem(self): + """ + Serialize in PEM format + """ + b=Membio() + if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): + raise CMSError("writing CMS to PEM") + return str(b) + -class SignedData: - def __init__(self,ptr=None): - self.ptr=ptr +class SignedData(CMSBase): @staticmethod - def create(data,cert,pkey,flags=Flags.BINARY): + def create(data,cert,pkey,flags=Flags.BINARY,certs=[]): """ Creates SignedData message by signing data with pkey and certificate. @param data - data to sign @param pkey - pkey object with private key to sign + @param flags - OReed combination of Flags constants + @param certs - list of X509 objects to include into CMS """ if not pkey.cansign: raise ValueError("Specified keypair has no private part") if cert.pubkey!=pkey: raise ValueError("Certificate doesn't match public key") b=Membio(data) - ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,None,b.bio,flags) + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags) if ptr is None: raise CMSError("signing message") return SignedData(ptr) @@ -94,10 +125,11 @@ class SignedData: Adds another signer to already signed message @param cert - signer's certificate @param pkey - signer's private key - @param data - data to sign (if detached) - @param md - message digest to use as DigestType object (if None - default for key - would be used) - @param flags - flags + @param md - message digest to use as DigestType object + (if None - default for key would be used) + @param data - data to sign (if detached and + Flags.REUSE_DIGEST is not specified) + @param flags - ORed combination of Flags consants """ if not pkey.cansign: raise ValueError("Specified keypair has no private part") @@ -116,49 +148,53 @@ class SignedData: res= libcrypto.CMS_final(self.ptr,biodata,None,flags) if res<=0: raise CMSError - def verify(self,store,flags,data=None): + def verify(self,store,flags,data=None,certs=[]): + """ + Verifies signature under CMS message using trusted cert store + + @param store - X509Store object with trusted certs + @param flags - OR-ed combination of flag consants + @param data - message data, if messge has detached signature + param certs - list of certificates to use during verification + If Flags.NOINTERN is specified, these are only + sertificates to search for signing certificates + @returns True if signature valid, False otherwise + """ bio=None if data!=None: b=Membio(data) bio=b.bio - res=libcrypto.CMS_verify(self.ptr,store.store,bio,None,flags) + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags) return res>0 - def __str__(self): - """ - Serialize in DER format - """ - b=Membio() - if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) - - def pem(self): - """ - Serialize in PEM format - """ - b=Membio() - if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) - @property def signers(self,store=None): """ Return list of signer's certificates """ - raise NotImplementedError + p=libcrypto.CMS_get0_signers(self.ptr) + if p is None: + raise CMSError + return StackOfX509(ptr=p,disposable=False) @property def data(self): """ - Returns signed data if present + Returns signed data if present in the message """ - raise NotImplementedError + b=Membio() + if not libcrypto.CMS_verify(self.ptr,None,None,None,b.bio,Flags.NO_VERIFY): + raise CMSError("extract data") + return str(b) def addcert(self,cert): """ Adds a certificate (probably intermediate CA) to the SignedData structure """ - raise NotImplementedError + if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0: + raise CMSError("adding cert") def addcrl(self,crl): """ Adds a CRL to the signed data structure @@ -169,7 +205,10 @@ class SignedData: """ List of the certificates contained in the structure """ - raise NotImplementedError + p=CMS_get1_certs(self.ptr) + if p is None: + raise CMSError("getting certs") + return StackOfX509(ptr=p,disposable=True) @property def crls(self): """ @@ -177,29 +216,7 @@ class SignedData: """ raise NotImplementedError -class EnvelopedData: - def __init__(self,ptr): - """ - Initializes an object. For internal use - """ - self.ptr=ptr - def __str__(self): - """ - Serialize in DER format - """ - b=Membio() - if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) - - def pem(self): - """ - Serialize in PEM format - """ - b=Membio() - if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) +class EnvelopedData(CMSBase): @staticmethod def create(recipients,data,cipher,flags=0): """ @@ -209,8 +226,12 @@ class EnvelopedData: @param cipher - CipherType object @param flags - flag """ - # Need to be able to handle OPENSSL stacks - raise NotImplementedError + recp=StackOfX509(recipients) + b=Membio(data) + p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags) + if p is None: + raise CMSError("encrypt EnvelopedData") + return EnvelopedData(p) def decrypt(self,pkey,cert,flags=0): """ Decrypts message @@ -229,3 +250,36 @@ class EnvelopedData: if res<=0: raise CMSError("decrypting CMS") return str(b) + +class EncryptedData(CMSBase): + @staticmethod + def create(data,cipher,key,flags=0): + """ + Creates an EncryptedData message. + @param data data to encrypt + @param cipher cipher.CipherType object represening required + cipher type + @param key - byte array used as simmetic key + @param flags - OR-ed combination of Flags constant + """ + b=Membio(data) + ptr=libcrypto.CMS_EncryptedData_encrypt(b.bio,cipher.cipher_type,key,len(key),flags) + if ptr is None: + raise CMSError("encrypt data") + return EncryptedData(ptr) + def decrypt(self,key,flags=0): + """ + Decrypts encrypted data message + @param key - symmetic key to decrypt + @param flags - OR-ed combination of Flags constant + """ + b=Membio() + if libcrypto.CMS_EncryptedData_decrypt(self.ptr,key,len(key),None, + b.bio,flags)<=0: + raise CMSError("decrypt data") + return str(b) + + + +libcrypto.CMS_verify.restype=c_int +libcrypto.CMS_verify.argtypes=(c_void_p,c_void_p,c_void_p,c_void_p,c_void_p,c_int)