This requires changes in the engine and exception modules as well.
Some common infrastructure to make import-time decision is placed in __init__.py
libcrypto = CDLL(__libname__)
libcrypto.OPENSSL_config.argtypes = (c_char_p, )
+pyver=int(sys.version[0])
+if pyver == 2:
+ bintype = str
+ chartype = unicode
+ inttype = (int, long)
+else:
+ bintype = bytes
+ chartype = str
+ inttype = int
if hasattr(libcrypto,'OPENSSL_init_crypto'):
libcrypto.OPENSSL_init_crypto.argtypes = (c_uint64,c_void_p)
"""
Interface to OpenSSL BIO library
"""
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto,pyver, inttype
from ctypes import c_char_p, c_void_p, c_int, string_at, c_long
from ctypes import POINTER, byref, create_string_buffer
class Membio(object):
"""
Cleans up memory used by bio
"""
- libcrypto.BIO_free(self.bio)
- del self.bio
+ if hasattr(self,'bio'):
+ libcrypto.BIO_free(self.bio)
+ del self.bio
- def __str__(self):
+ def __bytes__(self):
"""
Returns current contents of buffer as byte string
"""
Attempts to interpret current contents of buffer as UTF-8 string
and convert it to unicode
"""
- return str(self).decode("utf-8")
-
+ return self.__bytes__().decode("utf-8")
+ if pyver == 2:
+ __str__ = __bytes__
+ else:
+ __str__ = __unicode__
def read(self, length=None):
"""
Reads data from readble BIO. For test purposes.
If not BIO is read until end of buffer
"""
if not length is None:
- if not isinstance(length, (int, long)):
+ if not isinstance(length, inttype) :
raise TypeError("length to read should be number")
buf = create_string_buffer(length)
readbytes = libcrypto.BIO_read(self.bio, buf, length)
if readbytes == -1:
raise IOError
if readbytes == 0:
- return ""
+ return b""
return buf.raw[:readbytes]
else:
buf = create_string_buffer(1024)
- out = ""
+ out = b""
readbytes = 1
while readbytes > 0:
readbytes = libcrypto.BIO_read(self.bio, buf, 1024)
"""
Writes data to writable bio. For test purposes
"""
- if isinstance(data, unicode):
- data = data.encode("utf-8")
+ if pyver == 2:
+ if isinstance(data, unicode):
+ data = data.encode("utf-8")
+ else:
+ data = str(data)
else:
- data = str(data)
+ if not isinstance(data, bytes):
+ data=str(data).encode("utf-8")
written = libcrypto.BIO_write(self.bio, data, len(data))
if written == -2:
"""
from ctypes import create_string_buffer, c_char_p, c_void_p, c_int
from ctypes import byref, POINTER
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto, pyver, bintype
from ctypescrypto.exception import LibCryptoError
from ctypescrypto.oid import Oid
Constructs cipher algortihm using textual name as in openssl
command line
"""
+ if pyver > 2:
+ cipher_name = cipher_name.encode('utf-8')
self.cipher = libcrypto.EVP_get_cipherbyname(cipher_name)
if self.cipher is None:
raise CipherError("Unknown cipher: %s" % cipher_name)
"""
if self.cipher_finalized:
raise CipherError("No updates allowed")
- if not isinstance(data, str):
- raise TypeError("A string is expected")
+ if not isinstance(data, bintype):
+ raise TypeError("A byte string is expected")
if len(data) == 0:
return ""
outbuf = create_string_buffer(self.block_size+len(data))
if outlen.value > 0:
return outbuf.raw[:int(outlen.value)]
else:
- return ""
+ return b""
def _clean_ctx(self):
"""
"""
from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long, c_longlong
from ctypes import create_string_buffer, byref
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto,pyver, bintype
from ctypescrypto.exception import LibCryptoError
from ctypescrypto.oid import Oid
DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512")
self.digest_name = digest_name.longname()
else:
self.digest_name = str(digest_name)
- self.digest = libcrypto.EVP_get_digestbyname(self.digest_name)
+ self.digest = libcrypto.EVP_get_digestbyname(self.digest_name.encode('us-ascii'))
if self.digest is None:
raise DigestError("Unknown digest: %s" % self.digest_name)
"""
if self.digest_finalized:
raise DigestError("No updates allowed")
- if not isinstance(data, str):
- raise TypeError("A string is expected")
+ if not isinstance(data, bintype):
+ raise TypeError("A byte string is expected")
if length is None:
length = len(data)
elif length > len(data):
with hashlib
"""
from base64 import b16encode
- return b16encode(self.digest(data))
+ if pyver == 2:
+ return b16encode(self.digest(data))
+ else:
+ return b16encode(self.digest(data)).decode('us-ascii')
# Declare function result and argument types
engine loading and configuration
"""
from ctypes import c_void_p, c_char_p, c_int
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto,pyver
from ctypescrypto.exception import LibCryptoError
__all__ = ['default', 'set_default', 'Engine']
in the token, accessed by engine
"""
def __init__(self, engine_id, **kwargs):
+ if pyver > 2 or isinstance(engine_id, unicode):
+ engine_id = engine_id.encode('utf-8')
eng = libcrypto.ENGINE_by_id(engine_id)
if eng is None:
# Try load engine
Exception which extracts libcrypto error information
"""
from ctypes import c_ulong, c_char_p, create_string_buffer
-from ctypescrypto import libcrypto, strings_loaded
+from ctypescrypto import libcrypto, strings_loaded, pyver
__all__ = ['LibCryptoError', 'clear_err_stack']
+if pyver == 2:
+ def _get_error_str(err_code,buf):
+ return libcrypto.ERR_error_string(err_code,buf)
+else:
+ def _get_error_str(err_code,buf):
+ return libcrypto.ERR_error_string(err_code,buf).decode('utf-8')
class LibCryptoError(Exception):
"""
Exception for libcrypto errors. Adds all the info, which can be
mesg = msg
buf = create_string_buffer(128)
while err_code != 0:
- mesg += "\n\t" + libcrypto.ERR_error_string(err_code, buf)
+ mesg += "\n\t" + _get_error_str(err_code, buf)
err_code = libcrypto.ERR_get_error()
super(LibCryptoError, self).__init__(mesg)
This module provides Oid object which represents entry to OpenSSL
OID database.
"""
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto, pyver,bintype,chartype,inttype
from ctypes import c_char_p, c_void_p, c_int, create_string_buffer
from ctypescrypto.exception import LibCryptoError
by some libcrypto function or extracted from some libcrypto
structure
"""
- if isinstance(value, unicode):
+ if isinstance(value, chartype):
value = value.encode('ascii')
- if isinstance(value, str):
+ if isinstance(value, bintype):
self.nid = libcrypto.OBJ_txt2nid(value)
if self.nid == 0:
raise ValueError("Cannot find object %s in the database" %
value)
- elif isinstance(value, (int, long)):
+ elif isinstance(value, inttype):
short = libcrypto.OBJ_nid2sn(value)
if short is None:
raise ValueError("No such nid %d in the database" % value)
def __hash__(self):
" Hash of object is equal to nid because Oids with same nid are same"
return self.nid
- def __cmp__(self, other):
- " Compares NIDs of two objects "
- return self.nid - other.nid
+ def __eq__ (self, other):
+ return self.nid == other.nid
+ def __hash__(self):
+ """ Returns NID of object as hash value. Should make Oids with
+ identical NID compare equal and also let use Oids as
+ dictionary keys"""
+ return self.nid
def __str__(self):
" Default string representation of Oid is dotted-decimal "
return self.dotted()
def __repr__(self):
" Returns constructor call of Oid with dotted representation "
return "Oid('%s')" % (self.dotted())
- def shortname(self):
- " Returns short name if any "
- return libcrypto.OBJ_nid2sn(self.nid)
- def longname(self):
- " Returns logn name if any "
- return libcrypto.OBJ_nid2ln(self.nid)
+ if pyver == 2:
+ def shortname(self):
+ " Returns short name if any "
+ return libcrypto.OBJ_nid2sn(self.nid)
+ def longname(self):
+ " Returns long name if any "
+ return libcrypto.OBJ_nid2ln(self.nid)
+ else:
+ def shortname(self):
+ " Returns short name if any "
+ return libcrypto.OBJ_nid2sn(self.nid).decode('utf-8')
+ def longname(self):
+ " Returns long name if any "
+ return libcrypto.OBJ_nid2ln(self.nid).decode('utf-8')
+
def dotted(self):
" Returns dotted-decimal reperesentation "
obj = libcrypto.OBJ_nid2obj(self.nid)
buf = create_string_buffer(256)
libcrypto.OBJ_obj2txt(buf, 256, obj, 1)
- return buf.value
+ if pyver == 2:
+ return buf.value
+ else:
+ return buf.value.decode('ascii')
@staticmethod
def fromobj(obj):
"""
Oid alredy in database are undefined
"""
+ if pyver > 2:
+ dotted = dotted.encode('ascii')
+ shortname = shortname.encode('utf-8')
+ longname = longname.encode('utf-8')
nid = libcrypto.OBJ_create(dotted, shortname, longname)
if nid == 0:
raise LibCryptoError("Problem adding new OID to the database")
"""
from ctypes import create_string_buffer, c_char_p, c_int, c_double
-from ctypescrypto import libcrypto
+from ctypescrypto import libcrypto, bintype
from ctypescrypto.exception import LibCryptoError
__all__ = ['RandError', 'bytes', 'pseudo_bytes', 'seed', 'status']
If entropy is not None, it should be floating point(double)
value estimating amount of entropy in the data (in bytes).
"""
- if not isinstance(data, str):
+ if not isinstance(data, bintype):
raise TypeError("A string is expected")
ptr = c_char_p(data)
size = len(data)
import unittest
result = unittest.TextTestResult(sys.stdout, True, True)
suite = unittest.defaultTestLoader.discover("./tests")
- print "Discovered %d test cases" % suite.countTestCases()
+ print ("Discovered %d test cases" % suite.countTestCases())
result.buffer = True
suite.run(result)
- print ""
+ print ("")
if not result.wasSuccessful():
if len(result.errors):
- print "============ Errors disovered ================="
+ print ("============ Errors disovered =================")
for res in result.errors:
- print res[0], ":", res[1]
+ print (res[0], ":", res[1])
if len(result.failures):
- print "============ Failures disovered ================="
+ print ("============ Failures disovered =================")
for res in result.failures:
- print res[0], ":", res[1]
+ print (res[0], ":", res[1])
sys.exit(1)
else:
- print "All tests successful"
+ print ("All tests successful")
setup(
name="ctypescrypto",
- version="0.4.1",
+ version="0.4.2",
description="CTypes-based interface for some OpenSSL libcrypto features",
author="Victor Wagner",
author_email="vitus@wagner.pp.ru",
from ctypescrypto.bio import Membio
import unittest
+import sys
+if sys.version[0]>'2':
+ def unicode(b):
+ return str(b)
+
class TestRead(unittest.TestCase):
def test_readshortstr(self):
- s="A quick brown fox jumps over a lazy dog"
+ s=b"A quick brown fox jumps over a lazy dog"
bio=Membio(s)
data=bio.read()
self.assertEqual(data,s)
data2=bio.read()
- self.assertEqual(data2,"")
+ self.assertEqual(data2,b"")
del bio
def test_readwithlen(self):
- s="A quick brown fox jumps over a lazy dog"
+ s=b"A quick brown fox jumps over a lazy dog"
bio=Membio(s)
data=bio.read(len(s))
self.assertEqual(data,s)
data2=bio.read(5)
- self.assertEqual(data2,"")
+ self.assertEqual(data2,b"")
def test_readwrongtype(self):
- s="A quick brown fox jumps over a lazy dog"
+ s=b"A quick brown fox jumps over a lazy dog"
bio=Membio(s)
with self.assertRaises(TypeError):
data=bio.read("5")
def test_reset(self):
- s="A quick brown fox jumps over a lazy dog"
+ s=b"A quick brown fox jumps over a lazy dog"
bio=Membio(s)
data=bio.read()
bio.reset()
self.assertEqual(data,data2)
self.assertEqual(data,s)
def test_readlongstr(self):
- poem='''Eyes of grey--a sodden quay,
+ poem=b'''Eyes of grey--a sodden quay,
Driving rain and falling tears,
As the steamer wears to sea
In a parting storm of cheers.
self.assertEqual(data,poem)
del bio
def test_readparts(self):
- s="A quick brown fox jumps over the lazy dog"
+ s=b"A quick brown fox jumps over the lazy dog"
bio=Membio(s)
a=bio.read(10)
self.assertEqual(a,s[0:10])
c=bio.read()
self.assertEqual(c,s[19:])
d=bio.read()
- self.assertEqual(d,"")
+ self.assertEqual(d,b"")
class TestWrite(unittest.TestCase):
def test_write(self):
b=Membio()
- b.write("A quick brown ")
- b.write("fox jumps over ")
- b.write("the lazy dog.")
+ b.write(b"A quick brown ")
+ b.write(b"fox jumps over ")
+ b.write(b"the lazy dog.")
self.assertEqual(str(b),"A quick brown fox jumps over the lazy dog.")
def test_unicode(self):
b=Membio()
- s='\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8'
+ s=b'\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8'
b.write(s)
self.assertEqual(unicode(b),u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438')
def test_unicode2(self):
with self.assertRaises(ValueError):
c=cipher.Cipher(ct,None,None)
def test_blockcipher(self):
- data="sdfdsddf"
- key='abcdabcd'
+ data=b"sdfdsddf"
+ key=b'abcdabcd'
c=cipher.new("DES-CBC",key)
enc=c.update(data)+c.finish()
# See if padding is added by default
dec=d.update(enc)+d.finish()
self.assertEqual(data,dec)
def test_blockcipher_nopadding(self):
- data="sdfdsddf"
- key='abcdabcd'
+ data=b"sdfdsddf"
+ key=b'abcdabcd'
c=cipher.new("DES-CBC",key)
c.padding(False)
enc=c.update(data)+c.finish()
dec=d.update(enc)+d.finish()
self.assertEqual(data,dec)
def test_ofb_cipher(self):
- data="sdfdsddfxx"
- key='abcdabcd'
- iv='abcdabcd'
+ data=b"sdfdsddfxx"
+ key=b'abcdabcd'
+ iv=b'abcdabcd'
c=cipher.new("DES-OFB",key,iv=iv)
enc=c.update(data)+c.finish()
# See if padding is added by default
self.assertEqual(data,dec)
def test_ofb_noiv(self):
- data="sdfdsddfxx"
- encryptkey='abcdabcd'*4
+ data=b"sdfdsddfxx"
+ encryptkey=b'abcdabcd'*4
decryptkey=encryptkey[0:5]+encryptkey[5:]
dec=d.update(enc)+d.finish()
self.assertEqual(data,dec)
def test_wrong_keylength(self):
- data="sdfsdfxxx"
- key="abcdabcd"
+ data=b"sdfsdfxxx"
+ key=b"abcdabcd"
with self.assertRaises(ValueError):
c=cipher.new("AES-128-OFB",key)
def test_wrong_ivlength(self):
- key="abcdabcdabcdabcd"
- iv="xxxxx"
+ key=b"abcdabcdabcdabcd"
+ iv=b"xxxxx"
with self.assertRaises(ValueError):
c=cipher.new("AES-128-OFB",key,iv=iv)
def test_variable_keylength(self):
- encryptkey="abcdefabcdefghtlgvasdgdgsdgdsg"
- data="asdfsdfsdfsdfsdfsdfsdfsdf"
- iv="abcdefgh"
+ encryptkey=b"abcdefabcdefghtlgvasdgdgsdgdsg"
+ data=b"asdfsdfsdfsdfsdfsdfsdfsdf"
+ iv=b"abcdefgh"
c=cipher.new("bf-ofb",encryptkey,iv=iv)
ciphertext=c.update(data)+c.finish()
decryptkey=encryptkey[0:5]+encryptkey[5:]
class TestIface(unittest.TestCase):
""" Test all methods with one algorithms """
- msg="A quick brown fox jumps over the lazy dog."
+ msg=b"A quick brown fox jumps over the lazy dog."
dgst="00CFFE7312BF9CA73584F24BDF7DF1D028340397"
def test_cons(self):
md=digest.DigestType("sha1")
self.assertEqual(dgst.digest(self.msg),b16decode(self.dgst))
def test_length(self):
l=len(self.msg)
- msg=self.msg+" Dog barks furiously."
+ msg=self.msg+b" Dog barks furiously."
dgst=digest.new("sha1")
dgst.update(msg,length=l)
self.assertEqual(dgst.hexdigest(),self.dgst)
dgst.update(u'\u0430\u0431')
def test_copy(self):
dgst=digest.new("sha1")
- dgst.update("A quick brown fox jumps over ")
+ dgst.update(b"A quick brown fox jumps over ")
d2=dgst.copy()
- dgst.update("the lazy dog.")
+ dgst.update(b"the lazy dog.")
value1=dgst.hexdigest()
- d2.update("the fat pig.")
+ d2.update(b"the fat pig.")
value2=d2.hexdigest()
self.assertEqual(value1,"00CFFE7312BF9CA73584F24BDF7DF1D028340397")
self.assertEqual(value2,"5328F33739BEC2A15B6A30F17D3BC13CC11A7C78")
def test_md5(self):
d=digest.new("md5")
self.assertEqual(d.digest_size,16)
- d.update("A quick brown fox jumps over the lazy dog.")
+ d.update(b"A quick brown fox jumps over the lazy dog.")
self.assertEqual(d.hexdigest(),"DF756A3769FCAB0A261880957590C768")
def test_md4(self):
d=digest.new("md4")
- d.update("A quick brown fox jumps over the lazy dog.")
+ d.update(b"A quick brown fox jumps over the lazy dog.")
self.assertEqual(d.digest_size,16)
self.assertEqual(d.hexdigest(),"FAAED595A3E38BBF0D9B4B98021D200F")
def test_sha256(self):
d=digest.new("sha256")
- d.update("A quick brown fox jumps over the lazy dog.")
+ d.update(b"A quick brown fox jumps over the lazy dog.")
self.assertEqual(d.digest_size,32)
self.assertEqual(d.hexdigest(),"FFCA2587CFD4846E4CB975B503C9EB940F94566AA394E8BD571458B9DA5097D5")
def test_sha384(self):
d=digest.new("sha384")
- d.update("A quick brown fox jumps over the lazy dog.")
+ d.update(b"A quick brown fox jumps over the lazy dog.")
self.assertEqual(d.digest_size,48)
self.assertEqual(d.hexdigest(),"C7D71B1BA81D0DD028E79C7E75CF2F83169C14BA732CA5A2AD731151584E9DE843C1A314077D62B96B03367F72E126D8")
def test_sha512(self):
d=digest.new("sha512")
self.assertEqual(d.digest_size,64)
- d.update("A quick brown fox jumps over the lazy dog.")
+ d.update(b"A quick brown fox jumps over the lazy dog.")
self.assertEqual(d.hexdigest(),"3045575CF3B873DD656F5F3426E04A4ACD11950BB2538772EE14867002B408E21FF18EF7F7B2CAB484A3C1C0BE3F8ACC4AED536A427353C7748DC365FC1A8646")
def test_wrongdigest(self):
with self.assertRaises(digest.DigestError):
class TestMac(unittest.TestCase):
def test_hmac_default(self):
- d=MAC('hmac',key='1234'*4)
- d.update('The Quick brown fox jumps over the lazy dog\n')
+ d=MAC('hmac',key=b'1234'*4)
+ d.update(b'The Quick brown fox jumps over the lazy dog\n')
self.assertEqual(d.name,'hmac-md5')
self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56')
def test_hmac_digestdataa(self):
- d=MAC('hmac',key='1234'*4)
- h=d.hexdigest('The Quick brown fox jumps over the lazy dog\n')
+ d=MAC('hmac',key=b'1234'*4)
+ h=d.hexdigest(b'The Quick brown fox jumps over the lazy dog\n')
self.assertEqual(d.name,'hmac-md5')
self.assertEqual(h,'A9C16D91CDF2A99273B72336D0D16B56')
def test_hmac_byoid(self):
- d=MAC(Oid('hmac'),key='1234'*4)
- d.update('The Quick brown fox jumps over the lazy dog\n')
+ d=MAC(Oid('hmac'),key=b'1234'*4)
+ d.update(b'The Quick brown fox jumps over the lazy dog\n')
self.assertEqual(d.name,'hmac-md5')
self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56')
def test_mac_wrongtype(self):
with self.assertRaises(TypeError):
- d=MAC(Oid('hmac').nid,key='1234'*4)
+ d=MAC(Oid('hmac').nid,key=b'1234'*4)
def test_hmac_sha256(self):
- d=MAC('hmac',key='1234'*16,digest='sha256')
- d.update('The Quick brown fox jumps over the lazy dog\n')
+ d=MAC('hmac',key=b'1234'*16,digest='sha256')
+ d.update(b'The Quick brown fox jumps over the lazy dog\n')
self.assertEqual(d.name,'hmac-sha256')
self.assertEqual(d.hexdigest(),'BEBA086E1C67200664DCDEEC697D99DB1A8DAA72933A36B708FC5FD568173095')
def test_gostmac(self):
set_default('gost')
- d=MAC('gost-mac',key='1234'*8)
- d.update('The Quick brown fox jumps over the lazy dog\n')
+ d=MAC('gost-mac',key=b'1234'*8)
+ d.update(b'The Quick brown fox jumps over the lazy dog\n')
self.assertEqual(d.name,'gost-mac')
self.assertEqual(d.digest_size,4)
self.assertEqual(d.hexdigest(),'76F25AE3')
with self.assertRaisesRegexp(DigestError,"invalid mac key length"):
- d=MAC('gost-mac',key='1234'*4)
+ d=MAC('gost-mac',key=b'1234'*4)
if __name__ == "__main__":
unittest.main()
b2=pseudo_bytes(100)
self.assertNotEqual(b,b2)
def test_seed(self):
- b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg"
+ b=b"aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg"
seed(b)
# Check if no segfault here
def test_entropy(self):
- b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg"
+ b=b"aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg"
seed(b,2.25)
# Check if no segfault here
def test_Status(self):