Skip to content

Commit d206240

Browse files
authored
feat: hmac hashing jinja filter (#476)
1 parent 3e82436 commit d206240

File tree

2 files changed

+108
-2
lines changed

2 files changed

+108
-2
lines changed

airbyte_cdk/sources/declarative/interpolation/filters.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
import base64
66
import hashlib
7+
import hmac as hmac_lib
78
import json
89
import re
9-
from typing import Any, Optional
10+
from typing import Any, Dict, Optional
1011

1112

1213
def hash(value: Any, hash_type: str = "md5", salt: Optional[str] = None) -> str:
@@ -135,5 +136,51 @@ def regex_search(value: str, regex: str) -> str:
135136
return ""
136137

137138

138-
_filters_list = [hash, base64encode, base64decode, base64binascii_decode, string, regex_search]
139+
def hmac(value: Any, key: str, hash_type: str = "sha256") -> str:
140+
"""
141+
Implementation of a custom Jinja2 hmac filter with SHA-256 support.
142+
143+
This filter creates a Hash-based Message Authentication Code (HMAC) using a cryptographic
144+
hash function and a secret key. Currently only supports SHA-256, and returns hexdigest of the signature.
145+
146+
Example usage in a low code connector:
147+
148+
auth_headers:
149+
$ref: "#/definitions/base_auth"
150+
$parameters:
151+
signature: "{{ 'message_to_sign' | hmac('my_secret_key') }}"
152+
153+
:param value: The message to be authenticated
154+
:param key: The secret key for the HMAC
155+
:param hash_type: Hash algorithm to use (default: sha256)
156+
:return: HMAC digest as a hexadecimal string
157+
"""
158+
# Define allowed hash functions
159+
ALLOWED_HASH_TYPES: Dict[str, Any] = {
160+
"sha256": hashlib.sha256,
161+
}
162+
163+
if hash_type not in ALLOWED_HASH_TYPES:
164+
raise ValueError(
165+
f"Hash type '{hash_type}' is not allowed. Allowed types: {', '.join(ALLOWED_HASH_TYPES.keys())}"
166+
)
167+
168+
hmac_obj = hmac_lib.new(
169+
key=str(key).encode("utf-8"),
170+
msg=str(value).encode("utf-8"),
171+
digestmod=ALLOWED_HASH_TYPES[hash_type],
172+
)
173+
174+
return hmac_obj.hexdigest()
175+
176+
177+
_filters_list = [
178+
hash,
179+
base64encode,
180+
base64decode,
181+
base64binascii_decode,
182+
string,
183+
regex_search,
184+
hmac,
185+
]
139186
filters = {f.__name__: f for f in _filters_list}

unit_tests/sources/declarative/interpolation/test_filters.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
import base64
55
import hashlib
6+
import hmac as hmac_lib
67

78
import pytest
89

@@ -105,3 +106,61 @@ def test_regex_search_no_match() -> None:
105106
val = interpolation.eval(expression_with_regex, {})
106107

107108
assert val is None
109+
110+
111+
def test_hmac_sha256_default() -> None:
112+
message = "test_message"
113+
secret_key = "test_secret_key"
114+
115+
s = "{{ '%s' | hmac('%s') }}" % (message, secret_key)
116+
filter_hmac = interpolation.eval(s, config={})
117+
118+
# compute expected hmac using the hmac library directly
119+
hmac_obj = hmac_lib.new(
120+
key=secret_key.encode("utf-8"), msg=message.encode("utf-8"), digestmod=hashlib.sha256
121+
)
122+
expected_hmac = hmac_obj.hexdigest()
123+
124+
assert filter_hmac == expected_hmac
125+
126+
127+
def test_hmac_sha256_explicit() -> None:
128+
message = "test_message"
129+
secret_key = "test_secret_key"
130+
131+
s = "{{ '%s' | hmac('%s', 'sha256') }}" % (message, secret_key)
132+
filter_hmac = interpolation.eval(s, config={})
133+
134+
# compute expected hmac using the hmac library directly
135+
hmac_obj = hmac_lib.new(
136+
key=secret_key.encode("utf-8"), msg=message.encode("utf-8"), digestmod=hashlib.sha256
137+
)
138+
expected_hmac = hmac_obj.hexdigest()
139+
140+
assert filter_hmac == expected_hmac
141+
142+
143+
def test_hmac_with_numeric_value() -> None:
144+
message = 12345
145+
secret_key = "test_secret_key"
146+
147+
s = "{{ %d | hmac('%s') }}" % (message, secret_key)
148+
filter_hmac = interpolation.eval(s, config={})
149+
150+
# compute expected hmac using the hmac library directly
151+
hmac_obj = hmac_lib.new(
152+
key=secret_key.encode("utf-8"), msg=str(message).encode("utf-8"), digestmod=hashlib.sha256
153+
)
154+
expected_hmac = hmac_obj.hexdigest()
155+
156+
assert filter_hmac == expected_hmac
157+
158+
159+
def test_hmac_with_invalid_hash_type() -> None:
160+
message = "test_message"
161+
secret_key = "test_secret_key"
162+
163+
s = "{{ '%s' | hmac('%s', 'md5') }}" % (message, secret_key)
164+
165+
with pytest.raises(ValueError):
166+
interpolation.eval(s, config={})

0 commit comments

Comments
 (0)