From 312aa2432ed617a5412402eee27d1cd7975c41dd Mon Sep 17 00:00:00 2001 From: Neha Sony Date: Fri, 15 Nov 2024 11:33:52 +0000 Subject: [PATCH 1/7] Update test.yml --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 286c50d..f0c15ef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,6 +32,7 @@ jobs: pip3 install -r requirements.txt pip3 install . pip3 install coverage + pip3 install --upgrade setuptools - name: Run Tests run: | coverage run setup.py test From 7c7d324d4b63e9d9ba90b3f4611a447eacca3d12 Mon Sep 17 00:00:00 2001 From: Neha Sony Date: Fri, 15 Nov 2024 11:36:01 +0000 Subject: [PATCH 2/7] Update test.yml --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f0c15ef..01fff65 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,6 +29,7 @@ jobs: python-version: '${{ matrix.python-version }}' - name: Install dependencies run: | + pip3 install --upgrade pip3 pip3 install -r requirements.txt pip3 install . pip3 install coverage From b5f82264d9394e9adefa976bbb62e8b18539a4be Mon Sep 17 00:00:00 2001 From: Neha Sony Date: Fri, 15 Nov 2024 11:41:36 +0000 Subject: [PATCH 3/7] Update test.yml --- .github/workflows/test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 01fff65..e4432d8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,11 +29,10 @@ jobs: python-version: '${{ matrix.python-version }}' - name: Install dependencies run: | - pip3 install --upgrade pip3 pip3 install -r requirements.txt pip3 install . pip3 install coverage - pip3 install --upgrade setuptools + pip3 install 'setuptools>=69.1.0' --force-reinstall - name: Run Tests run: | coverage run setup.py test From 3b610f1131fdaf09ba0daf67eb176f3922f229ca Mon Sep 17 00:00:00 2001 From: Neha Sony Date: Fri, 15 Nov 2024 11:44:57 +0000 Subject: [PATCH 4/7] Update test.yml --- .github/workflows/test.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4432d8..fb32236 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,5 +35,6 @@ jobs: pip3 install 'setuptools>=69.1.0' --force-reinstall - name: Run Tests run: | - coverage run setup.py test - coverage xml + python -m unittest discover + # coverage run setup.py test + # coverage xml From 40092b174df662285ff432306f7cdfdd2d1a154a Mon Sep 17 00:00:00 2001 From: Neha Sony Date: Fri, 15 Nov 2024 11:53:02 +0000 Subject: [PATCH 5/7] Update test.yml --- .github/workflows/test.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fb32236..c57333b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,9 +32,6 @@ jobs: pip3 install -r requirements.txt pip3 install . pip3 install coverage - pip3 install 'setuptools>=69.1.0' --force-reinstall - name: Run Tests run: | python -m unittest discover - # coverage run setup.py test - # coverage xml From f558a3d5099f5110be1d2332a56c114bab24482e Mon Sep 17 00:00:00 2001 From: "MASTERCARD\\e129886" Date: Tue, 16 Dec 2025 13:00:02 +0000 Subject: [PATCH 6/7] HMAC support for Mastercard JWE encryption --- README.md | 16 +++++ client_encryption/jwe_encryption.py | 41 +++++++++++- client_encryption/jwe_encryption_config.py | 7 +++ tests/test_jwe_encryption.py | 72 ++++++++++++++++++++++ tests/test_jwe_encryption_config.py | 12 ++++ 5 files changed, 146 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1764129..524f6d1 100644 --- a/README.md +++ b/README.md @@ -237,6 +237,22 @@ Output: } ``` + ###### AES-CBC HMAC Authentication (A128CBC-HS256) + + When decrypting JWE payloads that use AES-CBC with HMAC (enc `A128CBC-HS256`), you can enable authentication tag verification by adding the following optional flag to your JWE config: + + ```json + { + "enableCbcHmacVerification": true + } + ``` + + - **Default**: disabled (for backward compatibility). When disabled, AES-CBC payloads decrypt without verifying the HMAC tag. + - **When enabled**: the library validates the HMAC tag using the first half of the CEK as the MAC key and rejects payloads with missing or incorrect tags. + - **Scope**: applies only to `A128CBC-HS256`; GCM modes already provide authentication. + + Enable this when both producer and consumer support HMAC verification and you require authenticity protection for AES-CBC encrypted payloads. + #### Mastercard Encryption and Decryption + [Introduction](#mastercard-introduction) diff --git a/client_encryption/jwe_encryption.py b/client_encryption/jwe_encryption.py index 2560212..acad665 100644 --- a/client_encryption/jwe_encryption.py +++ b/client_encryption/jwe_encryption.py @@ -2,6 +2,7 @@ import json from Crypto.Cipher import AES from Crypto.Cipher.AES import block_size +from Crypto.Hash import HMAC, SHA256 from Crypto.Util.Padding import unpad from client_encryption.encoding_utils import url_encode_bytes, decode_jwe @@ -83,12 +84,19 @@ def decrypt_payload(payload, config, _params=None): params = SessionKeyParams(config, encrypted_key, iv, 'SHA256') key = params.key - header = json.loads(decode_jwe(encrypted_value[0])) + protected_header = encrypted_value[0] + header = json.loads(decode_jwe(protected_header)) cipher_text = decode_jwe(encrypted_value[3]) + auth_tag = decode_jwe(encrypted_value[4]) if len(encrypted_value) > 4 else b"" decryption_method = header['enc'] if decryption_method == 'A128CBC-HS256': - aes = AES.new(key[16:], AES.MODE_CBC, iv) # NOSONAR + mac_key, enc_key = _split_cbc_keys(key) + if config.enable_cbc_hmac_verification: + aad = protected_header.encode("ascii") + _verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, auth_tag) + + aes = AES.new(enc_key, AES.MODE_CBC, iv) # NOSONAR elif decryption_method == 'A128GCM' or decryption_method == 'A192GCM' or decryption_method == 'A256GCM': aad = json.dumps(header).encode("ascii") aes = AES.new(key, AES.MODE_GCM, iv) @@ -143,3 +151,32 @@ def _build_header(alg, enc, cty, kid): sort_keys=False ) return json_header + + +def _split_cbc_keys(cek): + if len(cek) != 32: + raise EncryptionError("Invalid content encryption key length for AES-CBC HMAC.") + + return cek[:16], cek[16:] + + +def _verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, auth_tag): + if not auth_tag: + raise EncryptionError("Authentication tag missing for AES-CBC encrypted payload.") + + expected_tag = _compute_cbc_auth_tag(mac_key, aad, iv, cipher_text) + + if expected_tag != auth_tag: + raise EncryptionError("Authentication tag verification failed for AES-CBC encrypted payload.") + + +def _compute_cbc_auth_tag(mac_key, aad, iv, cipher_text): + al = (len(aad) * 8).to_bytes(8, byteorder="big") + + hmac = HMAC.new(mac_key, digestmod=SHA256) + hmac.update(aad) + hmac.update(iv) + hmac.update(cipher_text) + hmac.update(al) + + return hmac.digest()[:16] diff --git a/client_encryption/jwe_encryption_config.py b/client_encryption/jwe_encryption_config.py index c14a46d..7f33602 100644 --- a/client_encryption/jwe_encryption_config.py +++ b/client_encryption/jwe_encryption_config.py @@ -45,6 +45,9 @@ def __init__(self, conf): self._encrypted_value_field_name = json_config["encryptedValueFieldName"] + # Optional support for verifying HMAC auth tags on AES-CBC encrypted payloads + self._enable_cbc_hmac_verification = json_config.get("enableCbcHmacVerification", False) + # Fixed properties self._data_encoding = ClientEncoding.BASE64 self._oaep_padding_digest_algorithm = "SHA256" @@ -81,6 +84,10 @@ def decryption_key(self): def encrypted_value_field_name(self): return self._encrypted_value_field_name + @property + def enable_cbc_hmac_verification(self): + return self._enable_cbc_hmac_verification + @staticmethod def __compute_fingerprint(asn1): return SHA256.new(asn1).hexdigest() diff --git a/tests/test_jwe_encryption.py b/tests/test_jwe_encryption.py index a9a4388..187d925 100644 --- a/tests/test_jwe_encryption.py +++ b/tests/test_jwe_encryption.py @@ -1,5 +1,9 @@ +import json import unittest +import hmac +import hashlib import client_encryption.jwe_encryption as to_test +from client_encryption.encryption_exception import EncryptionError from client_encryption.jwe_encryption_config import JweEncryptionConfig from tests import get_mastercard_config_for_test @@ -107,3 +111,71 @@ def test_decrypt_payload_should_decrypt_cbc_payload(self): payload = to_test.decrypt_payload(encrypted_payload, self._config) self.assertNotIn("encryptedValue", payload) self.assertDictEqual(decrypted_payload, payload) + + def test_decrypt_payload_should_verify_hmac_when_enabled(self): + config = self.__build_config_with_hmac(True) + + encrypted_payload = { + "encryptedValue": "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ" + } + + decrypted_payload = {"foo": "bar"} + + payload = to_test.decrypt_payload(encrypted_payload, config) + self.assertDictEqual(decrypted_payload, payload) + + def test_decrypt_payload_should_fail_when_hmac_invalid_and_enabled(self): + config = self.__build_config_with_hmac(True) + + encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ" + tampered_parts = encrypted_value.split(".") + tampered_parts[-1] = tampered_parts[-1][:-1] + ("A" if tampered_parts[-1][-1] != "A" else "B") + tampered_payload = {"encryptedValue": ".".join(tampered_parts)} + + with self.assertRaises(EncryptionError): + to_test.decrypt_payload(tampered_payload, config) + + def test_decrypt_payload_should_skip_hmac_when_disabled(self): + config = self.__build_config_with_hmac(False) + + encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ" + tampered_parts = encrypted_value.split(".") + tampered_parts[-1] = tampered_parts[-1][:-1] + ("A" if tampered_parts[-1][-1] != "A" else "B") + tampered_payload = {"encryptedValue": ".".join(tampered_parts)} + + payload = to_test.decrypt_payload(tampered_payload, config) + self.assertEqual({"foo": "bar"}, payload) + + def test_decrypt_payload_should_fail_when_hmac_missing_and_enabled(self): + config = self.__build_config_with_hmac(True) + + encrypted_value = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.2GzZlB3scifhqlzIV2Rxk1TwiWL35e0AtcI9MFusG9jv9zGrJ8BapJx73PlFu69S0IAR7hXpqwzD7-UzmHUdrxB7izbMm9TNDpznHIuTaJWSRngD5Zui_rUXETL0GJG8dERx7IngqTltfzZanhDnjDNfKaowD6pFSEVN-Ff-pTeJqLMPs5504DtnYGD_uhQjvFmREIBgQTGEINzT88PXwLTAVBbWbAad_I-4Q12YwW_Y4yqmARCMTRWP-ixMrlSWCJlh6hz-biEotWNwGvp2pdhdiEP2VSvvUKHd7IngMWcMozOcoZQ1n18kWiFvt90fzNXSmzTjyGYSWUsa_mVouA.aX5mOSiXtilwYPFeTUFN_A.ZyAY79BAjG-QMQIhesj9bQ.TPZ2VYWdTLopCNkvMqUyuQ" + missing_tag_payload = {"encryptedValue": ".".join(encrypted_value.split(".")[:-1])} + + with self.assertRaises(EncryptionError): + to_test.decrypt_payload(missing_tag_payload, config) + + def test_split_cbc_keys_should_reject_invalid_length(self): + with self.assertRaises(EncryptionError): + to_test._split_cbc_keys(b"short-key") + + def test_compute_cbc_auth_tag_matches_reference(self): + mac_key = b"\x01" * 16 + aad = b"header" + iv = b"\x02" * 16 + cipher_text = b"\x03\x04" + + expected_tag = hmac.new(mac_key, aad + iv + cipher_text + (len(aad) * 8).to_bytes(8, "big"), hashlib.sha256).digest()[:16] + computed_tag = to_test._compute_cbc_auth_tag(mac_key, aad, iv, cipher_text) + + self.assertEqual(expected_tag, computed_tag) + + def __build_config_with_hmac(self, enabled): + json_conf = json.loads(get_mastercard_config_for_test()) + json_conf["enableCbcHmacVerification"] = enabled + + conf = JweEncryptionConfig(json_conf) + conf._paths["$"]._to_encrypt = {"$": "$"} + conf._paths["$"]._to_decrypt = {"encryptedValue": "$"} + + return conf diff --git a/tests/test_jwe_encryption_config.py b/tests/test_jwe_encryption_config.py index fbe305d..ec25650 100644 --- a/tests/test_jwe_encryption_config.py +++ b/tests/test_jwe_encryption_config.py @@ -100,6 +100,17 @@ def test_load_config_missing_decryption_key(self): conf = to_test.JweEncryptionConfig(json_conf) self.assertIsNone(conf.decryption_key) + def test_load_config_hmac_verification_default_disabled(self): + conf = to_test.JweEncryptionConfig(self._test_config_file) + self.assertFalse(conf.enable_cbc_hmac_verification) + + def test_load_config_hmac_verification_enabled(self): + json_conf = json.loads(self._test_config_file) + json_conf["enableCbcHmacVerification"] = True + + conf = to_test.JweEncryptionConfig(json_conf) + self.assertTrue(conf.enable_cbc_hmac_verification) + def test_load_config_decryption_key_file_not_found(self): wrong_json = json.loads(self._test_config_file) wrong_json["decryptionKey"] = resource_path("keys/wrong_private_key_name.pem") @@ -127,3 +138,4 @@ def __check_configuration(self, conf, encoding=ClientEncoding.BASE64, oaep_algo= conf.encryption_key_fingerprint, "Wrong public key fingerprint") self.assertEqual(oaep_algo, conf.oaep_padding_digest_algorithm, "Oaep padding algorithm not set") + self.assertFalse(conf.enable_cbc_hmac_verification) From cf228d33398204884cbab162d93a40a5c23de11e Mon Sep 17 00:00:00 2001 From: "MASTERCARD\\e129886" Date: Wed, 17 Dec 2025 11:55:12 +0000 Subject: [PATCH 7/7] Adding tests --- client_encryption/jwe_encryption.py | 2 +- tests/test_jwe_encryption.py | 59 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/client_encryption/jwe_encryption.py b/client_encryption/jwe_encryption.py index acad665..95bb3e1 100644 --- a/client_encryption/jwe_encryption.py +++ b/client_encryption/jwe_encryption.py @@ -87,7 +87,7 @@ def decrypt_payload(payload, config, _params=None): protected_header = encrypted_value[0] header = json.loads(decode_jwe(protected_header)) cipher_text = decode_jwe(encrypted_value[3]) - auth_tag = decode_jwe(encrypted_value[4]) if len(encrypted_value) > 4 else b"" + auth_tag = decode_jwe(encrypted_value[4]) if len(encrypted_value) > 4 else None decryption_method = header['enc'] if decryption_method == 'A128CBC-HS256': diff --git a/tests/test_jwe_encryption.py b/tests/test_jwe_encryption.py index 187d925..dd6e4ab 100644 --- a/tests/test_jwe_encryption.py +++ b/tests/test_jwe_encryption.py @@ -5,6 +5,11 @@ import client_encryption.jwe_encryption as to_test from client_encryption.encryption_exception import EncryptionError from client_encryption.jwe_encryption_config import JweEncryptionConfig +from client_encryption.encoding_utils import encode_bytes, url_encode_bytes +from client_encryption.session_key_params import SessionKeyParams +from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding +from Crypto.Cipher import PKCS1_OAEP +from Crypto.PublicKey import RSA from tests import get_mastercard_config_for_test @@ -159,6 +164,13 @@ def test_split_cbc_keys_should_reject_invalid_length(self): with self.assertRaises(EncryptionError): to_test._split_cbc_keys(b"short-key") + def test_split_cbc_keys_should_split_mac_and_enc_halves(self): + cek = bytes(range(32)) + mac_key, enc_key = to_test._split_cbc_keys(cek) + + self.assertEqual(cek[:16], mac_key) + self.assertEqual(cek[16:], enc_key) + def test_compute_cbc_auth_tag_matches_reference(self): mac_key = b"\x01" * 16 aad = b"header" @@ -170,6 +182,53 @@ def test_compute_cbc_auth_tag_matches_reference(self): self.assertEqual(expected_tag, computed_tag) + def test_verify_cbc_hmac_tag_should_pass_on_valid_tag(self): + mac_key = b"\x0a" * 16 + aad = b"protected" + iv = b"\x0b" * 16 + cipher_text = b"\x0c\x0d\x0e" + valid_tag = to_test._compute_cbc_auth_tag(mac_key, aad, iv, cipher_text) + + # Should not raise + to_test._verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, valid_tag) + + def test_verify_cbc_hmac_tag_should_raise_on_bad_tag(self): + mac_key = b"\x0a" * 16 + aad = b"protected" + iv = b"\x0b" * 16 + cipher_text = b"\x0c\x0d\x0e" + bad_tag = b"\x00" * 16 + + with self.assertRaises(EncryptionError): + to_test._verify_cbc_hmac_tag(mac_key, aad, iv, cipher_text, bad_tag) + + def test_encrypt_and_decrypt_cbc_with_hmac_roundtrip(self): + config = self.__build_config_with_hmac(True) + + # Deterministic CEK and IV for repeatable test + secret_key = bytes(range(32)) + iv = b"\x11" * 16 + + public_bytes = config.encryption_certificate.public_key().public_bytes(Encoding.DER, + PublicFormat.SubjectPublicKeyInfo) + rsa_pub = RSA.import_key(public_bytes) + cipher = PKCS1_OAEP.new(rsa_pub, hashAlgo=to_test.SHA256) + encrypted_secret_key = cipher.encrypt(secret_key) + + iv_encoded = encode_bytes(iv, config.data_encoding) + encrypted_key_value = url_encode_bytes(encrypted_secret_key) + + params = SessionKeyParams(config, encrypted_key_value, iv_encoded, 'SHA256') + params._key = secret_key + params._iv = iv + + payload = {"message": "Hello World"} + + encrypted_payload = to_test.encrypt_payload(payload, config, params) + decrypted_payload = to_test.decrypt_payload(encrypted_payload, config) + + self.assertDictEqual(payload, decrypted_payload) + def __build_config_with_hmac(self, enabled): json_conf = json.loads(get_mastercard_config_for_test()) json_conf["enableCbcHmacVerification"] = enabled