Skip to content
This repository was archived by the owner on May 9, 2020. It is now read-only.

Commit 96f7ea5

Browse files
committed
encrypted_data_bag_item module refactored. Removed nested classes
1 parent bc62035 commit 96f7ea5

File tree

2 files changed

+118
-120
lines changed

2 files changed

+118
-120
lines changed

chef/encrypted_data_bag_item.py

Lines changed: 107 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -18,122 +18,117 @@ def __getitem__(self, key):
1818
if key == 'id':
1919
return self.raw_data[key]
2020
else:
21-
return EncryptedDataBagItem.Decryptors.create_decryptor(self.api.encryption_key, self.raw_data[key]).decrypt()
21+
return create_decryptor(self.api.encryption_key, self.raw_data[key]).decrypt()
2222

2323
def __setitem__(self, key, value):
2424
if key == 'id':
2525
self.raw_data[key] = value
2626
else:
27-
self.raw_data[key] = EncryptedDataBagItem.Encryptors.create_encryptor(self.api.encryption_key, value, self.api.encryption_version).to_dict()
28-
29-
@staticmethod
30-
def get_version(data):
31-
if data.has_key('version'):
32-
if str(data['version']) in map(str, EncryptedDataBagItem.SUPPORTED_ENCRYPTION_VERSIONS):
33-
return data['version']
34-
else:
35-
raise ChefUnsupportedEncryptionVersionError(data['version'])
27+
self.raw_data[key] = create_encryptor(self.api.encryption_key, value, self.api.encryption_version).to_dict()
28+
29+
def create_encryptor(key, data, version):
30+
try:
31+
return {
32+
1: EncryptorVersion1(key, data),
33+
2: EncryptorVersion2(key, data)
34+
}[version]
35+
except KeyError:
36+
raise ChefUnsupportedEncryptionVersionError(version)
37+
38+
class EncryptorVersion1(object):
39+
VERSION = 1
40+
41+
def __init__(self, key, data):
42+
self.plain_key = key
43+
self.key = hashlib.sha256(key).digest()
44+
self.data = data
45+
self.iv = os.urandom(8).encode('hex')
46+
self.encryptor = AES256Cipher(key=self.key, iv=self.iv)
47+
self.encrypted_data = None
48+
49+
def encrypt(self):
50+
if self.encrypted_data is None:
51+
data = json.dumps({'json_wrapper': self.data})
52+
self.encrypted_data = self.encryptor.encrypt(data)
53+
return self.encrypted_data
54+
55+
def to_dict(self):
56+
return {
57+
"encrypted_data": base64.standard_b64encode(self.encrypt()),
58+
"iv": base64.standard_b64encode(self.iv),
59+
"version": self.VERSION,
60+
"cipher": "aes-256-cbc"
61+
}
62+
63+
class EncryptorVersion2(EncryptorVersion1):
64+
VERSION = 2
65+
66+
def __init__(self, key, data):
67+
super(EncryptorVersion2, self).__init__(key, data)
68+
self.hmac = None
69+
70+
def encrypt(self):
71+
self.encrypted_data = super(EncryptorVersion2, self).encrypt()
72+
self.hmac = (self.hmac if self.hmac is not None else self._generate_hmac())
73+
return self.encrypted_data
74+
75+
def _generate_hmac(self):
76+
raw_hmac = hmac.new(self.plain_key, base64.standard_b64encode(self.encrypted_data), hashlib.sha256).digest()
77+
return raw_hmac
78+
79+
def to_dict(self):
80+
result = super(EncryptorVersion2, self).to_dict()
81+
result['hmac'] = base64.standard_b64encode(self.hmac)
82+
return result
83+
84+
def get_decryption_version(data):
85+
if data.has_key('version'):
86+
if str(data['version']) in map(str, EncryptedDataBagItem.SUPPORTED_ENCRYPTION_VERSIONS):
87+
return data['version']
3688
else:
37-
return 1
38-
39-
class Encryptors(object):
40-
@staticmethod
41-
def create_encryptor(key, data, version):
42-
try:
43-
return {
44-
1: EncryptedDataBagItem.Encryptors.EncryptorVersion1(key, data),
45-
2: EncryptedDataBagItem.Encryptors.EncryptorVersion2(key, data)
46-
}[version]
47-
except KeyError:
48-
raise ChefUnsupportedEncryptionVersionError(version)
49-
50-
class EncryptorVersion1(object):
51-
VERSION = 1
52-
def __init__(self, key, data):
53-
self.plain_key = key
54-
self.key = hashlib.sha256(key).digest()
55-
self.data = data
56-
self.iv = os.urandom(8).encode('hex')
57-
self.encryptor = AES256Cipher(key=self.key, iv=self.iv)
58-
self.encrypted_data = None
59-
60-
def encrypt(self):
61-
if self.encrypted_data is None:
62-
data = json.dumps({'json_wrapper': self.data})
63-
self.encrypted_data = self.encryptor.encrypt(data)
64-
return self.encrypted_data
65-
66-
def to_dict(self):
67-
return {
68-
"encrypted_data": base64.standard_b64encode(self.encrypt()),
69-
"iv": base64.standard_b64encode(self.iv),
70-
"version": self.VERSION,
71-
"cipher": "aes-256-cbc"
72-
}
73-
74-
class EncryptorVersion2(EncryptorVersion1):
75-
VERSION = 2
76-
77-
def __init__(self, key, data):
78-
super(EncryptedDataBagItem.Encryptors.EncryptorVersion2, self).__init__(key, data)
79-
self.hmac = None
80-
81-
def encrypt(self):
82-
self.encrypted_data = super(EncryptedDataBagItem.Encryptors.EncryptorVersion2, self).encrypt()
83-
self.hmac = (self.hmac if self.hmac is not None else self._generate_hmac())
84-
return self.encrypted_data
85-
86-
def _generate_hmac(self):
87-
raw_hmac = hmac.new(self.plain_key, base64.standard_b64encode(self.encrypted_data), hashlib.sha256).digest()
88-
return raw_hmac
89-
90-
def to_dict(self):
91-
result = super(EncryptedDataBagItem.Encryptors.EncryptorVersion2, self).to_dict()
92-
result['hmac'] = base64.standard_b64encode(self.hmac)
93-
return result
94-
95-
class Decryptors(object):
96-
@staticmethod
97-
def create_decryptor(key, data):
98-
version = EncryptedDataBagItem.get_version(data)
99-
if version == 1:
100-
return EncryptedDataBagItem.Decryptors.DecryptorVersion1(key, data['encrypted_data'], data['iv'])
101-
elif version == 2:
102-
return EncryptedDataBagItem.Decryptors.DecryptorVersion2(key, data['encrypted_data'], data['iv'], data['hmac'])
103-
104-
class DecryptorVersion1(object):
105-
def __init__(self, key, data, iv):
106-
self.key = hashlib.sha256(key).digest()
107-
self.data = base64.standard_b64decode(data)
108-
self.iv = base64.standard_b64decode(iv)
109-
self.decryptor = AES256Cipher(key=self.key, iv=self.iv)
110-
111-
def decrypt(self):
112-
value = self.decryptor.decrypt(self.data)
113-
# After decryption we should get a string with JSON
114-
try:
115-
value = json.loads(value)
116-
except ValueError:
117-
raise ChefDecryptionError("Error decrypting data bag value. Most likely the provided key is incorrect")
118-
return value['json_wrapper']
119-
120-
class DecryptorVersion2(DecryptorVersion1):
121-
122-
def __init__(self, key, data, iv, hmac):
123-
super(EncryptedDataBagItem.Decryptors.DecryptorVersion2, self).__init__(key, data, iv)
124-
self.hmac = base64.standard_b64decode(hmac)
125-
self.encoded_data = data
126-
127-
def _validate_hmac(self):
128-
expected_hmac = hmac.new(self.key, self.encoded_data, hashlib.sha256).digest()
129-
valid = len(expected_hmac) ^ len(self.hmac)
130-
for expected_char, candidate_char in itertools.izip_longest(expected_hmac, self.hmac):
131-
valid |= ord(expected_char) ^ ord(candidate_char)
132-
return valid == 0
133-
134-
def decrypt(self):
135-
if self._validate_hmac():
136-
return super(EncryptedDataBagItem.Decryptors.DecryptorVersion2, self).decrypt()
137-
else:
138-
raise ChefDecryptionError("Error decrypting data bag value. HMAC validation failed.")
89+
raise ChefUnsupportedEncryptionVersionError(data['version'])
90+
else:
91+
return 1
92+
93+
def create_decryptor(key, data):
94+
version = get_decryption_version(data)
95+
if version == 1:
96+
return DecryptorVersion1(key, data['encrypted_data'], data['iv'])
97+
elif version == 2:
98+
return DecryptorVersion2(key, data['encrypted_data'], data['iv'], data['hmac'])
99+
100+
class DecryptorVersion1(object):
101+
def __init__(self, key, data, iv):
102+
self.key = hashlib.sha256(key).digest()
103+
self.data = base64.standard_b64decode(data)
104+
self.iv = base64.standard_b64decode(iv)
105+
self.decryptor = AES256Cipher(key=self.key, iv=self.iv)
106+
107+
def decrypt(self):
108+
value = self.decryptor.decrypt(self.data)
109+
# After decryption we should get a string with JSON
110+
try:
111+
value = json.loads(value)
112+
except ValueError:
113+
raise ChefDecryptionError("Error decrypting data bag value. Most likely the provided key is incorrect")
114+
return value['json_wrapper']
115+
116+
class DecryptorVersion2(DecryptorVersion1):
117+
def __init__(self, key, data, iv, hmac):
118+
super(DecryptorVersion2, self).__init__(key, data, iv)
119+
self.hmac = base64.standard_b64decode(hmac)
120+
self.encoded_data = data
121+
122+
def _validate_hmac(self):
123+
expected_hmac = hmac.new(self.key, self.encoded_data, hashlib.sha256).digest()
124+
valid = len(expected_hmac) ^ len(self.hmac)
125+
for expected_char, candidate_char in itertools.izip_longest(expected_hmac, self.hmac):
126+
valid |= ord(expected_char) ^ ord(candidate_char)
127+
return valid == 0
128+
129+
def decrypt(self):
130+
if self._validate_hmac():
131+
return super(DecryptorVersion2, self).decrypt()
132+
else:
133+
raise ChefDecryptionError("Error decrypting data bag value. HMAC validation failed.")
139134

chef/tests/test_encrypted_data_bag_item.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from chef.exceptions import ChefError, ChefUnsupportedEncryptionVersionError, ChefDecryptionError
33
from chef.tests import ChefTestCase, TEST_ROOT
44
from chef.api import ChefAPI
5+
from chef.encrypted_data_bag_item import get_decryption_version
56

67
import copy
78
import os
@@ -31,14 +32,6 @@ def setUp(self):
3132
}
3233
}
3334

34-
def test_get_version(self):
35-
self.assertEqual(EncryptedDataBagItem.get_version({"version": "1"}), '1')
36-
self.assertEqual(EncryptedDataBagItem.get_version({"version": 1}), 1)
37-
self.assertEqual(EncryptedDataBagItem.get_version({"version": "2"}), '2')
38-
self.assertEqual(EncryptedDataBagItem.get_version({"version": 2}), 2)
39-
self.assertRaises(ChefUnsupportedEncryptionVersionError, EncryptedDataBagItem.get_version, {"version": 0})
40-
self.assertRaises(ChefUnsupportedEncryptionVersionError, EncryptedDataBagItem.get_version, {"version": "not a number"})
41-
4235
def test__getitem__(self):
4336
api = ChefAPI('https://chef_test:3000', os.path.join(TEST_ROOT, 'client.pem'), 'admin', secret_file=os.path.join(TEST_ROOT, 'encryption_key'))
4437
bag = DataBag('test_1')
@@ -81,3 +74,13 @@ def test__set_item__(self):
8174
self.assertIsNotNone(item.raw_data['pychef_test_ver2']['iv'])
8275
self.assertIsNotNone(item.raw_data['pychef_test_ver2']['hmac'])
8376
self.assertIsNotNone(item.raw_data['pychef_test_ver2']['encrypted_data'])
77+
78+
class EncryptedDataBagItemHelpersTestCase(ChefTestCase):
79+
def test_get_version(self):
80+
self.assertEqual(get_decryption_version({"version": "1"}), '1')
81+
self.assertEqual(get_decryption_version({"version": 1}), 1)
82+
self.assertEqual(get_decryption_version({"version": "2"}), '2')
83+
self.assertEqual(get_decryption_version({"version": 2}), 2)
84+
self.assertRaises(ChefUnsupportedEncryptionVersionError, get_decryption_version, {"version": 0})
85+
self.assertRaises(ChefUnsupportedEncryptionVersionError, get_decryption_version, {"version": "not a number"})
86+

0 commit comments

Comments
 (0)