"""
Interface to OpenSSL BIO library
"""
-from ctypescrypto import libcrypto,pyver, inttype
+from ctypescrypto import libcrypto,pyver, inttype, chartype
from ctypes import c_char_p, c_void_p, c_int, string_at, c_long
from ctypes import POINTER, byref, create_string_buffer
class Membio(object):
Provides interface to OpenSSL memory bios
use str() or unicode() to get contents of writable bio
use bio member to pass to libcrypto function
+
"""
- def __init__(self, data=None):
+ def __init__(self, data=None, clone=False):
"""
- If data is specified, creates read-only BIO. If data is
+ If data is specified, creates read-only BIO.
+ If clone is True, makes copy of data in the instance member
+ If data is
None, creates writable BIO, contents of which can be retrieved
by str() or unicode()
+
"""
if data is None:
method = libcrypto.BIO_s_mem()
self.bio = libcrypto.BIO_new(method)
else:
- self.bio = libcrypto.BIO_new_mem_buf(c_char_p(data), len(data))
-
+ if isinstance(data, chartype):
+ data = data.encode("utf-8")
+ clone = True
+ if clone :
+ self.data = data
+ self.bio = libcrypto.BIO_new_mem_buf(c_char_p(self.data), len(data))
+ else:
+ self.bio = libcrypto.BIO_new_mem_buf(c_char_p(data), len(data))
+
def __del__(self):
"""
Cleans up memory used by bio
"""
from ctypes import c_char_p, c_int, c_void_p, create_string_buffer
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto, chartype
from ctypescrypto.digest import DigestType
from ctypescrypto.exception import LibCryptoError
"""
dgst = DigestType(digesttype)
out = create_string_buffer(outlen)
- res = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt),
+ if isinstance(password,chartype):
+ pwd = password.encode("utf-8")
+ else:
+ pwd = password
+ res = libcrypto.PKCS5_PBKDF2_HMAC(pwd, len(pwd), salt, len(salt),
iterations, dgst.digest, outlen, out)
if res <= 0:
raise LibCryptoError("error computing PBKDF2")
from ctypes import c_char, c_char_p, c_void_p, c_int, c_long, POINTER
from ctypes import create_string_buffer, byref, memmove, CFUNCTYPE
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto,pyver,bintype,chartype
from ctypescrypto.exception import LibCryptoError, clear_err_stack
from ctypescrypto.bio import Membio
if c is None:
return PW_CALLBACK_FUNC(0)
if callable(c):
- def __cb(buf, length, rwflag, userdata):
- pwd = c(rwflag)
- cnt = min(len(pwd),length)
- memmove(buf,pwd, cnt)
- return cnt
+ if pyver ==2 :
+ def __cb(buf, length, rwflag, userdata):
+ pwd = c(rwflag)
+ cnt = min(len(pwd),length)
+ memmove(buf,pwd, cnt)
+ return cnt
+ else:
+ def __cb(buf, length, rwflag, userdata):
+ pwd = c(rwflag).encode("utf-8")
+ cnt = min(len(pwd),length)
+ memmove(buf,pwd, cnt)
+ return cnt
else:
+ if pyver > 2:
+ c=c.encode("utf-8")
def __cb(buf,length,rwflag,userdata):
cnt=min(len(c),length)
memmove(buf,c,cnt)
return cnt
return PW_CALLBACK_FUNC(__cb)
+def _keybio(blob, format):
+ # But DER string should be binary
+ if format == "PEM" and isinstance(blob,chartype):
+ return Membio(blob.encode("ascii"),clone=True)
+ elif isinstance(blob,bintype):
+ return Membio(blob)
+ else:
+ raise TypeError("Key should be either blob or PEM string")
class PKey(object):
"""
if not pubkey is None:
raise TypeError("Just one of ptr, pubkey or privkey can " +
"be specified")
- bio = Membio(privkey)
+ bio=_keybio(privkey,format)
self.cansign = True
if format == "PEM":
self.key = libcrypto.PEM_read_bio_PrivateKey(bio.bio, None,
if self.key is None:
raise PKeyError("error parsing private key")
elif not pubkey is None:
- bio = Membio(pubkey)
+ bio = _keybio(pubkey,format)
self.cansign = False
if format == "PEM":
self.key = libcrypto.PEM_read_bio_PUBKEY(bio.bio, None,
def __del__(self):
""" Frees EVP_PKEY object (note, it is reference counted) """
- libcrypto.EVP_PKEY_free(self.key)
+ if hasattr(self,"key"):
+ libcrypto.EVP_PKEY_free(self.key)
def __eq__(self, other):
""" Compares two public keys. If one has private key and other
paramsfrom does work too
"""
tmpeng = c_void_p(None)
- ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), algorithm, -1)
+ if isinstance(algorithm, chartype):
+ alg = algorithm.encode("ascii")
+ else:
+ alg = algorithm
+ ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), alg, -1)
if ameth is None:
raise PKeyError("Algorithm %s not foind\n"%(algorithm))
clear_err_stack()
evp_cipher, None, 0,
_password_callback(password),
None)
+ if ret ==0:
+ raise PKeyError("error serializing private key")
+ return str(bio)
else:
ret = libcrypto.i2d_PKCS8PrivateKey_bio(bio.bio, self.key,
evp_cipher, None, 0,
_password_callback(password),
None)
- if ret == 0:
- raise PKeyError("error serializing private key")
- return str(bio)
+ if ret ==0:
+ raise PKeyError("error serializing private key")
+ return bintype(bio)
@staticmethod
def _configure_context(ctx, opts, skip=()):
for oper in opts:
if oper in skip:
continue
- ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, oper, str(opts[oper]))
+ if isinstance(oper,chartype):
+ op = oper.encode("ascii")
+ else:
+ op = oper
+ if isinstance(opts[oper],chartype):
+ value = opts[oper].encode("ascii")
+ elif isinstance(opts[oper],bintype):
+ value = opts[oper]
+ else:
+ if pyver == 2:
+ value = str(opts[oper])
+ else:
+ value = str(opts[oper]).encode('ascii')
+ ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, op, value)
if ret == -2:
raise PKeyError("Parameter %s is not supported by key" % oper)
if ret < 1:
from ctypescrypto.pkey import PKey
from ctypescrypto.oid import Oid
from ctypescrypto.exception import LibCryptoError
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto, pyver, chartype, inttype, bintype
from datetime import datetime
-
+import sys
try:
from pytz import utc
except ImportError:
"""
if self.need_free:
libcrypto.X509_NAME_free(self.ptr)
- def __str__(self):
+ def __bytes__(self):
"""
Produces an ascii representation of the name, escaping all
symbols > 0x80. Probably it is not what you want, unless
bio = Membio()
libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0,
self.PRINT_FLAG | self.ESC_MSB)
- return str(bio)
+ return bio.__bytes__()
def __unicode__(self):
"""
"""
bio = Membio()
libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, self.PRINT_FLAG)
- return unicode(bio)
+ return bio.__unicode__()
+ if pyver == 2:
+ __str__ = __bytes__
+ else:
+ __str__ = __unicode__
+
+
def __len__(self):
"""
return number of components in the name
return libcrypto.X509_NAME_cmp(self.ptr, other.ptr)
def __eq__(self, other):
return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) == 0
+ def __gt__(self, other):
+ return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) > 0
+ def __lt__(self, other):
+ return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) < 0
def __getitem__(self, key):
if isinstance(key, Oid):
value = libcrypto.X509_NAME_ENTRY_get_data(entry)
bio = Membio()
libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
- return unicode(bio)
- elif isinstance(key, (int, long)):
+ return chartype(bio)
+ elif isinstance(key, inttype):
# Return OID, string tuple
entry = libcrypto.X509_NAME_get_entry(self.ptr, key)
if entry is None:
value = libcrypto.X509_NAME_ENTRY_get_data(entry)
bio = Membio()
libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
- return (oid, unicode(bio))
+ return (oid, chartype(bio))
else:
raise TypeError("X509 NAME can be indexed by Oids or integers only")
self.ptr = cast(ptr, POINTER(_x509_ext))
def __del__(self):
libcrypto.X509_EXTENSION_free(self.ptr)
- def __str__(self):
+ def __bytes__(self):
bio = Membio()
libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
- return str(bio)
+ return bintype(bio)
def __unicode__(self):
bio = Membio()
libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
- return unicode(bio)
+ return chartype(bio)
+ if pyver == 2:
+ __str__ = __bytes__
+ else:
+ __str__ = __unicode__
@property
def oid(self):
"Returns OID of the extension"
Frees certificate object
"""
libcrypto.X509_free(self.cert)
- def __str__(self):
+ def __bytes__(self):
""" Returns der string of the certificate """
bio = Membio()
if libcrypto.i2d_X509_bio(bio.bio, self.cert) == 0:
raise X509Error("error serializing certificate")
return str(bio)
+ if pyver == 2:
+ __str__ = __bytes__
def __repr__(self):
""" Returns valid call to the constructor """
- return "X509(data=" + repr(str(self)) + ",format='DER')"
+ return "X509(data=" + repr(self.pem()) + ",format='PEM')"
@property
def pubkey(self):
"""EVP PKEy object of certificate public key"""
if lookup is None:
raise X509Error("error installing file lookup method")
if file is not None:
- if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, file, 1, None) > 0:
+ if pyver == 2:
+ fn = file
+ else:
+ fn = file.encode(sys.getfilesystemencoding())
+ if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, fn, 1, None) > 0:
raise X509Error("error loading trusted certs from file "+file)
lookup = libcrypto.X509_STORE_add_lookup(self.store,
libcrypto.X509_LOOKUP_hash_dir())
if lookup is None:
raise X509Error("error installing hashed lookup method")
if dir is not None:
- if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dir, 1, None) > 0:
+ if pyver == 2:
+ dr = dir
+ else:
+ dr = dir.encode(sys.getfilesystemencoding())
+ if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dr, 1, None) > 0:
raise X509Error("error adding hashed trusted certs dir "+dir)
if default:
if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, None, 3, None) > 0:
+from ctypescrypto import pyver
from ctypescrypto.oid import Oid
from ctypescrypto.ec import create
from base64 import b16decode
"""
return Popen(["openssl","pkey","-text_pub","-noout"],stdin=PIPE,stdout=PIPE).communicate(key)[0]
class TestEcCreation(unittest.TestCase):
- ec1priv="""-----BEGIN PRIVATE KEY-----
+ ec1priv=b"""-----BEGIN PRIVATE KEY-----
MIGEAgEAMBAGByqGSM49AgEGBSuBBAAKBG0wawIBAQQgKnG6neqZvB98EEuuxnHs
fv+L/5abuNNG20wzUqRpncOhRANCAARWKXWeUZ6WiCKZ2kHx87jmJyx0G3ZB1iQC
+Gp2AJYswbQPhGPigKolzIbZYfwnn7QOca6N8QDhPAn3QQK8trZI
-----END PRIVATE KEY-----
"""
- bigkey="""-----BEGIN PRIVATE KEY-----
+ bigkey=b"""-----BEGIN PRIVATE KEY-----
MIGEAgEAMBAGByqGSM49AgEGBSuBBAAKBG0wawIBAQQgAAAAAAAAAAAAAAAAAAAA
AUVRIxlQt1/EQC2hcy/Jvr6hRANCAASRZsKJufkF5V+ePfn2nX81a0oiCV+JT0cV
cUqktWYGr/GB65Zr5Ky1z/nha2bYCb6U4hTwbJP9CRCZr5hJklXn
key=create(Oid("secp256k1"),b16decode("2A71BA9DEA99BC1F7C104BAEC671EC7EFF8BFF969BB8D346DB4C3352A4699DC3",True))
out=key.exportpriv()
+ if pyver > 2:
+ out=out.encode("ascii")
self.assertEqual(dump_key(out),dump_key(self.ec1priv))
- self.assertEqual(str(key),dump_pub_key(self.ec1priv))
+ if pyver == 2:
+ self.assertEqual(str(key),dump_pub_key(self.ec1priv))
+ else:
+ self.assertEqual(str(key).encode("ascii"),dump_pub_key(self.ec1priv))
def test_bignum(self):
- keyval='\xff'*32
+ keyval=b'\xff'*32
key=create(Oid("secp256k1"),keyval)
- self.assertEqual(dump_key(key.exportpriv()),dump_key(self.bigkey))
- self.assertEqual(str(key),dump_pub_key(self.bigkey))
+ keyblob = key.exportpriv()
+ if pyver > 2:
+ keyblob = keyblob.encode("ascii")
+ self.assertEqual(dump_key(keyblob),dump_key(self.bigkey))
+ keyblob2 = str(key)
+ if pyver > 2:
+ keyblob2 = keyblob2.encode('ascii')
+ self.assertEqual(keyblob2,dump_pub_key(self.bigkey))
if __name__ == "__main__":
unittest.main()
import unittest
class TestPBKDF2(unittest.TestCase):
- answersha1='\xc13\xb3\xc8\x80\xc2\t\x01\xdaR]\x08\x03\xaf>\x85\xed\x9bU\xf0\x89\n\x81Ctu\xee\xe3\xfe\xd9\xfd\x85\xe2"\x8c\xfbQ\xfeb4\x8f(ZF\xfd\xc3w\x13'
- answersha256='oY\xaf\xf7\xfeB7@\xa80%\t\'\xd5r0\xbe\xb4\xf7\xe6TQ\xd2|Tx\xc0e\xff[0a\xe56\xec\xff\xda\xcd\xed~\xbde\xad"\xe8\t\x01o'
- answersha1_1000='\xe9\xfe\xbf\xf5K\xfc\xe6h\xfd\xe3\x01\xac\xc8Uc\xcc\x9d\xc7\x1e\xf6\xf8\xd7\xaa\xef\x06se\xbe\x0e^e"\xefa\xba\xe1\xb0\x0b\xc1;\xcd\x05G<\xcc\rE\xfb'
+ answersha1=b'\xc13\xb3\xc8\x80\xc2\t\x01\xdaR]\x08\x03\xaf>\x85\xed\x9bU\xf0\x89\n\x81Ctu\xee\xe3\xfe\xd9\xfd\x85\xe2"\x8c\xfbQ\xfeb4\x8f(ZF\xfd\xc3w\x13'
+ answersha256=b'oY\xaf\xf7\xfeB7@\xa80%\t\'\xd5r0\xbe\xb4\xf7\xe6TQ\xd2|Tx\xc0e\xff[0a\xe56\xec\xff\xda\xcd\xed~\xbde\xad"\xe8\t\x01o'
+ answersha1_1000=b'\xe9\xfe\xbf\xf5K\xfc\xe6h\xfd\xe3\x01\xac\xc8Uc\xcc\x9d\xc7\x1e\xf6\xf8\xd7\xaa\xef\x06se\xbe\x0e^e"\xefa\xba\xe1\xb0\x0b\xc1;\xcd\x05G<\xcc\rE\xfb'
def test_defaults(self):
- d=pbkdf2("password","saltsalt",48)
+ d=pbkdf2("password",b"saltsalt",48)
self.assertEqual(d,self.answersha1)
def test_sha1(self):
- d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=2000)
+ d=pbkdf2("password",b"saltsalt",48,digesttype="sha1",iterations=2000)
self.assertEqual(d,self.answersha1)
def test_1000iter(self):
- d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=1000)
+ d=pbkdf2("password",b"saltsalt",48,digesttype="sha1",iterations=1000)
self.assertEqual(d,self.answersha1_1000)
def test_sha256(self):
- d=pbkdf2("password","\01\02\03\04\0abc",48,digesttype="sha256")
+ d=pbkdf2("password",b"\01\02\03\04\0abc",48,digesttype="sha256")
self.assertEqual(d,self.answersha256)
if __name__ == "__main__":
from ctypescrypto.pkey import PKey
+from ctypescrypto import pyver
import unittest,re
from base64 import b64decode, b16decode
from subprocess import Popen,PIPE,CalledProcessError
return b64decode(data)
def runopenssl(args,indata):
- p=Popen(['openssl']+args,stdin=PIPE,stdout=PIPE,stderr=PIPE,universal_newlines=True)
+ p=Popen(['openssl']+args,stdin=PIPE,stdout=PIPE,stderr=PIPE)
(out,err)=p.communicate(indata)
if p.returncode:
raise CalledProcessError(p.returncode," ".join(['openssl']+args)+":"+err)
+ if pyver > 2:
+ out = out.decode("utf-8")
return out
from ctypescrypto.cipher import CipherType
key=PKey(privkey=self.rsa)
pem=key.exportpriv(password='2222',cipher=CipherType("aes256"))
+ if pyver >2:
+ pem = pem.encode("ascii")
self.assertEqual(runopenssl(["pkey","-text_pub","-noout","-passin","pass:2222"],
pem),self.rsakeytext)
def test_export_priv_der(self):
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
+from ctypescrypto import chartype, bintype, inttype
from ctypescrypto.x509 import X509,X509Store,utc,StackOfX509
from ctypescrypto.oid import Oid
from tempfile import NamedTemporaryFile
self.assertEqual(c.pem(),self.cert1)
def test_subject(self):
c=X509(self.cert1)
- self.assertEqual(unicode(c.subject),u'C=RU,ST=Москва,L=Москва,O=Частное лицо,CN=Виктор Вагнер')
+ self.assertEqual(chartype(c.subject),u'C=RU,ST=Москва,L=Москва,O=Частное лицо,CN=Виктор Вагнер')
def test_subject_str(self):
c=X509(self.cert1)
- self.assertEqual(str(c.subject),b'C=RU,ST=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,L=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,O=\\D0\\A7\\D0\\B0\\D1\\81\\D1\\82\\D0\\BD\\D0\\BE\\D0\\B5 \\D0\\BB\\D0\\B8\\D1\\86\\D0\\BE,CN=\\D0\\92\\D0\\B8\\D0\\BA\\D1\\82\\D0\\BE\\D1\\80 \\D0\\92\\D0\\B0\\D0\\B3\\D0\\BD\\D0\\B5\\D1\\80')
+ self.assertEqual(bintype(c.subject),b'C=RU,ST=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,L=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,O=\\D0\\A7\\D0\\B0\\D1\\81\\D1\\82\\D0\\BD\\D0\\BE\\D0\\B5 \\D0\\BB\\D0\\B8\\D1\\86\\D0\\BE,CN=\\D0\\92\\D0\\B8\\D0\\BA\\D1\\82\\D0\\BE\\D1\\80 \\D0\\92\\D0\\B0\\D0\\B3\\D0\\BD\\D0\\B5\\D1\\80')
def test_subject_len(self):
c=X509(self.cert1)
self.assertEqual(len(c.subject),5)
def test_issuer(self):
c=X509(self.cert1)
- self.assertEqual(unicode(c.issuer),u'O=Удостоверяющий центр,CN=Виктор Вагнер,C=RU,ST=Москва')
+ self.assertEqual(chartype(c.issuer),u'O=Удостоверяющий центр,CN=Виктор Вагнер,C=RU,ST=Москва')
def test_subjectfields(self):
c=X509(self.cert1)
self.assertEqual(c.subject[Oid("C")],"RU")
self.assertEqual(ca.issuer,ca.subject)
def test_serial(self):
c=X509(self.cert1)
- self.assertEqual(c.serial,0xf941addf6362d979L)
+ self.assertEqual(c.serial,int("f941addf6362d979",16))
def test_version(self):
c=X509(self.cert1)
self.assertEqual(c.version,3)
def test_extension_text(self):
cert=X509(self.cert1)
ext=cert.extensions[0]
- self.assertEqual(str(ext),'CA:FALSE')
- self.assertEqual(unicode(ext),u'CA:FALSE')
+ self.assertEqual(bintype(ext),b'CA:FALSE')
+ self.assertEqual(chartype(ext),u'CA:FALSE')
def test_extenson_find(self):
cert=X509(self.cert1)
exts=cert.extensions.find(Oid('subjectAltName'))
c2=X509(self.digicert_cert)
self.assertTrue(c2.verify(store))
def test_verify_by_filestore(self):
- trusted=NamedTemporaryFile(delete=False)
+ trusted=NamedTemporaryFile(delete=False,mode="w")
trusted.write(self.ca_cert)
trusted.close()
goodcert=X509(self.cert1)
def test_certstack1(self):
l=[]
l.append(X509(self.cert1))
- self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер')
+ self.assertEqual(chartype(l[0].subject[Oid('CN')]),u'Виктор Вагнер')
l.append(X509(self.ca_cert))
l.append(X509(self.digicert_cert))
stack=StackOfX509(certs=l)
self.assertEqual(len(stack),3)
self.assertTrue(isinstance(stack[1],X509))
- self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер')
+ self.assertEqual(chartype(stack[0].subject[Oid('CN')]),u'Виктор Вагнер')
with self.assertRaises(IndexError):
c=stack[-1]
with self.assertRaises(IndexError):
c=stack[3]
del stack[1]
self.assertEqual(len(stack),2)
- self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер')
- self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1')
+ self.assertEqual(chartype(stack[0].subject[Oid('CN')]),u'Виктор Вагнер')
+ self.assertEqual(chartype(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1')
def test_certstack2(self):
stack=StackOfX509()
stack.append(X509(self.cert1))
c=stack[1]
stack[1]=X509(self.digicert_cert)
self.assertEqual(len(stack),2)
- self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1')
+ self.assertEqual(chartype(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1')
with self.assertRaises(IndexError):
stack[-1]=c
with self.assertRaises(IndexError):
def test_certstack3(self):
l=[]
l.append(X509(self.cert1))
- self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер')
+ self.assertEqual(chartype(l[0].subject[Oid('CN')]),u'Виктор Вагнер')
l.append(X509(self.ca_cert))
l.append(X509(self.digicert_cert))
stack=StackOfX509(certs=l)