Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,22 @@ Output:
}
```

###### AES-CBC HMAC Authentication (A128CBC-HS256)

When decrypting JWE payloads that use AES-CBC with HMAC (enc `A128CBC-HS256`), you can enable authentication tag verification by adding the following optional flag to your JWE config:

```json
{
"enableCbcHmacVerification": true
}
```

- **Default**: disabled (for backward compatibility). When disabled, AES-CBC payloads decrypt without verifying the HMAC tag.
- **When enabled**: the library validates the HMAC tag using the first half of the CEK as the MAC key and rejects payloads with missing or incorrect tags.
- **Scope**: applies only to `A128CBC-HS256`; GCM modes already provide authentication.

Enable this when both producer and consumer support HMAC verification and you require authenticity protection for AES-CBC encrypted payloads.

#### Mastercard Encryption and Decryption <a name="mastercard-encryption-and-decryption"></a>

+ [Introduction](#mastercard-introduction)
Expand Down
41 changes: 39 additions & 2 deletions client_encryption/jwe_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from Crypto.Cipher import AES
from Crypto.Cipher.AES import block_size
from Crypto.Hash import HMAC, SHA256
from Crypto.Util.Padding import unpad

from client_encryption.encoding_utils import url_encode_bytes, decode_jwe
Expand Down Expand Up @@ -83,12 +84,19 @@ def decrypt_payload(payload, config, _params=None):
params = SessionKeyParams(config, encrypted_key, iv, 'SHA256')
key = params.key

header = json.loads(decode_jwe(encrypted_value[0]))
protected_header = encrypted_value[0]
header = json.loads(decode_jwe(protected_header))
cipher_text = decode_jwe(encrypted_value[3])
auth_tag = decode_jwe(encrypted_value[4]) if len(encrypted_value) > 4 else None
decryption_method = header['enc']

if decryption_method == 'A128CBC-HS256':
aes = AES.new(key[16:], AES.MODE_CBC, iv) # NOSONAR
mac_key, enc_key = _split_cbc_keys(key)
if config.enable_cbc_hmac_verification:
aad = protected_header.encode("ascii")
_verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, auth_tag)

aes = AES.new(enc_key, AES.MODE_CBC, iv) # NOSONAR
elif decryption_method == 'A128GCM' or decryption_method == 'A192GCM' or decryption_method == 'A256GCM':
aad = json.dumps(header).encode("ascii")
aes = AES.new(key, AES.MODE_GCM, iv)
Expand Down Expand Up @@ -143,3 +151,32 @@ def _build_header(alg, enc, cty, kid):
sort_keys=False
)
return json_header


def _split_cbc_keys(cek):
if len(cek) != 32:
raise EncryptionError("Invalid content encryption key length for AES-CBC HMAC.")

return cek[:16], cek[16:]


def _verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, auth_tag):
if not auth_tag:
raise EncryptionError("Authentication tag missing for AES-CBC encrypted payload.")

expected_tag = _compute_cbc_auth_tag(mac_key, aad, iv, cipher_text)

if expected_tag != auth_tag:
raise EncryptionError("Authentication tag verification failed for AES-CBC encrypted payload.")


def _compute_cbc_auth_tag(mac_key, aad, iv, cipher_text):
al = (len(aad) * 8).to_bytes(8, byteorder="big")

hmac = HMAC.new(mac_key, digestmod=SHA256)
hmac.update(aad)
hmac.update(iv)
hmac.update(cipher_text)
hmac.update(al)

return hmac.digest()[:16]
7 changes: 7 additions & 0 deletions client_encryption/jwe_encryption_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def __init__(self, conf):

self._encrypted_value_field_name = json_config["encryptedValueFieldName"]

# Optional support for verifying HMAC auth tags on AES-CBC encrypted payloads
self._enable_cbc_hmac_verification = json_config.get("enableCbcHmacVerification", False)

# Fixed properties
self._data_encoding = ClientEncoding.BASE64
self._oaep_padding_digest_algorithm = "SHA256"
Expand Down Expand Up @@ -81,6 +84,10 @@ def decryption_key(self):
def encrypted_value_field_name(self):
return self._encrypted_value_field_name

@property
def enable_cbc_hmac_verification(self):
return self._enable_cbc_hmac_verification

@staticmethod
def __compute_fingerprint(asn1):
return SHA256.new(asn1).hexdigest()
Expand Down
131 changes: 131 additions & 0 deletions tests/test_jwe_encryption.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import json
import unittest
import hmac
import hashlib
import client_encryption.jwe_encryption as to_test
from client_encryption.encryption_exception import EncryptionError
from client_encryption.jwe_encryption_config import JweEncryptionConfig
from client_encryption.encoding_utils import encode_bytes, url_encode_bytes
from client_encryption.session_key_params import SessionKeyParams
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from tests import get_mastercard_config_for_test


Expand Down Expand Up @@ -107,3 +116,125 @@ def test_decrypt_payload_should_decrypt_cbc_payload(self):
payload = to_test.decrypt_payload(encrypted_payload, self._config)
self.assertNotIn("encryptedValue", payload)
self.assertDictEqual(decrypted_payload, payload)

def test_decrypt_payload_should_verify_hmac_when_enabled(self):
config = self.__build_config_with_hmac(True)

encrypted_payload = {
"encryptedValue": "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ"
}

decrypted_payload = {"foo": "bar"}

payload = to_test.decrypt_payload(encrypted_payload, config)
self.assertDictEqual(decrypted_payload, payload)

def test_decrypt_payload_should_fail_when_hmac_invalid_and_enabled(self):
config = self.__build_config_with_hmac(True)

encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ"
tampered_parts = encrypted_value.split(".")
tampered_parts[-1] = tampered_parts[-1][:-1] + ("A" if tampered_parts[-1][-1] != "A" else "B")
tampered_payload = {"encryptedValue": ".".join(tampered_parts)}

with self.assertRaises(EncryptionError):
to_test.decrypt_payload(tampered_payload, config)

def test_decrypt_payload_should_skip_hmac_when_disabled(self):
config = self.__build_config_with_hmac(False)

encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ"
tampered_parts = encrypted_value.split(".")
tampered_parts[-1] = tampered_parts[-1][:-1] + ("A" if tampered_parts[-1][-1] != "A" else "B")
tampered_payload = {"encryptedValue": ".".join(tampered_parts)}

payload = to_test.decrypt_payload(tampered_payload, config)
self.assertEqual({"foo": "bar"}, payload)

def test_decrypt_payload_should_fail_when_hmac_missing_and_enabled(self):
config = self.__build_config_with_hmac(True)

encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ"
missing_tag_payload = {"encryptedValue": ".".join(encrypted_value.split(".")[:-1])}

with self.assertRaises(EncryptionError):
to_test.decrypt_payload(missing_tag_payload, config)

def test_split_cbc_keys_should_reject_invalid_length(self):
with self.assertRaises(EncryptionError):
to_test._split_cbc_keys(b"short-key")

def test_split_cbc_keys_should_split_mac_and_enc_halves(self):
cek = bytes(range(32))
mac_key, enc_key = to_test._split_cbc_keys(cek)

self.assertEqual(cek[:16], mac_key)
self.assertEqual(cek[16:], enc_key)

def test_compute_cbc_auth_tag_matches_reference(self):
mac_key = b"\x01" * 16
aad = b"header"
iv = b"\x02" * 16
cipher_text = b"\x03\x04"

expected_tag = hmac.new(mac_key, aad + iv + cipher_text + (len(aad) * 8).to_bytes(8, "big"), hashlib.sha256).digest()[:16]
computed_tag = to_test._compute_cbc_auth_tag(mac_key, aad, iv, cipher_text)

self.assertEqual(expected_tag, computed_tag)

def test_verify_cbc_hmac_tag_should_pass_on_valid_tag(self):
mac_key = b"\x0a" * 16
aad = b"protected"
iv = b"\x0b" * 16
cipher_text = b"\x0c\x0d\x0e"
valid_tag = to_test._compute_cbc_auth_tag(mac_key, aad, iv, cipher_text)

# Should not raise
to_test._verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, valid_tag)

def test_verify_cbc_hmac_tag_should_raise_on_bad_tag(self):
mac_key = b"\x0a" * 16
aad = b"protected"
iv = b"\x0b" * 16
cipher_text = b"\x0c\x0d\x0e"
bad_tag = b"\x00" * 16

with self.assertRaises(EncryptionError):
to_test._verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, bad_tag)

def test_encrypt_and_decrypt_cbc_with_hmac_roundtrip(self):
config = self.__build_config_with_hmac(True)

# Deterministic CEK and IV for repeatable test
secret_key = bytes(range(32))
iv = b"\x11" * 16

public_bytes = config.encryption_certificate.public_key().public_bytes(Encoding.DER,
PublicFormat.SubjectPublicKeyInfo)
rsa_pub = RSA.import_key(public_bytes)
cipher = PKCS1_OAEP.new(rsa_pub, hashAlgo=to_test.SHA256)
encrypted_secret_key = cipher.encrypt(secret_key)

iv_encoded = encode_bytes(iv, config.data_encoding)
encrypted_key_value = url_encode_bytes(encrypted_secret_key)

params = SessionKeyParams(config, encrypted_key_value, iv_encoded, 'SHA256')
params._key = secret_key
params._iv = iv

payload = {"message": "Hello World"}

encrypted_payload = to_test.encrypt_payload(payload, config, params)
decrypted_payload = to_test.decrypt_payload(encrypted_payload, config)

self.assertDictEqual(payload, decrypted_payload)

def __build_config_with_hmac(self, enabled):
json_conf = json.loads(get_mastercard_config_for_test())
json_conf["enableCbcHmacVerification"] = enabled

conf = JweEncryptionConfig(json_conf)
conf._paths["$"]._to_encrypt = {"$": "$"}
conf._paths["$"]._to_decrypt = {"encryptedValue": "$"}

return conf
12 changes: 12 additions & 0 deletions tests/test_jwe_encryption_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ def test_load_config_missing_decryption_key(self):
conf = to_test.JweEncryptionConfig(json_conf)
self.assertIsNone(conf.decryption_key)

def test_load_config_hmac_verification_default_disabled(self):
conf = to_test.JweEncryptionConfig(self._test_config_file)
self.assertFalse(conf.enable_cbc_hmac_verification)

def test_load_config_hmac_verification_enabled(self):
json_conf = json.loads(self._test_config_file)
json_conf["enableCbcHmacVerification"] = True

conf = to_test.JweEncryptionConfig(json_conf)
self.assertTrue(conf.enable_cbc_hmac_verification)

def test_load_config_decryption_key_file_not_found(self):
wrong_json = json.loads(self._test_config_file)
wrong_json["decryptionKey"] = resource_path("keys/wrong_private_key_name.pem")
Expand Down Expand Up @@ -127,3 +138,4 @@ def __check_configuration(self, conf, encoding=ClientEncoding.BASE64, oaep_algo=
conf.encryption_key_fingerprint, "Wrong public key fingerprint")

self.assertEqual(oaep_algo, conf.oaep_padding_digest_algorithm, "Oaep padding algorithm not set")
self.assertFalse(conf.enable_cbc_hmac_verification)