-
Notifications
You must be signed in to change notification settings - Fork 419
/
Copy pathkinesis_stream_event.py
117 lines (89 loc) · 3.84 KB
/
kinesis_stream_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import base64
import json
import zlib
from typing import Iterator, List
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper
class KinesisStreamRecordPayload(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> float:
"""The approximate time that the record was inserted into the stream"""
return float(self["kinesis"]["approximateArrivalTimestamp"])
@property
def data(self) -> str:
"""The data blob"""
return self["kinesis"]["data"]
@property
def kinesis_schema_version(self) -> str:
"""Schema version for the record"""
return self["kinesis"]["kinesisSchemaVersion"]
@property
def partition_key(self) -> str:
"""Identifies which shard in the stream the data record is assigned to"""
return self["kinesis"]["partitionKey"]
@property
def sequence_number(self) -> str:
"""The unique identifier of the record within its shard"""
return self["kinesis"]["sequenceNumber"]
def data_as_bytes(self) -> bytes:
"""Decode binary encoded data as bytes"""
return base64.b64decode(self.data)
def data_as_text(self) -> str:
"""Decode binary encoded data as text"""
return self.data_as_bytes().decode("utf-8")
def data_as_json(self) -> dict:
"""Decode binary encoded data as json"""
return json.loads(self.data_as_text())
def data_zlib_compressed_as_json(self) -> dict:
"""Decode binary encoded data as bytes"""
decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
return json.loads(decompressed)
class KinesisStreamRecord(DictWrapper):
@property
def aws_region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["awsRegion"]
@property
def event_id(self) -> str:
"""A globally unique identifier for the event that was recorded in this stream record."""
return self["eventID"]
@property
def event_name(self) -> str:
"""Event type eg: aws:kinesis:record"""
return self["eventName"]
@property
def event_source(self) -> str:
"""The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceARN"]
@property
def event_version(self) -> str:
"""The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
return self["eventVersion"]
@property
def invoke_identity_arn(self) -> str:
"""The ARN for the identity used to invoke the Lambda Function"""
return self["invokeIdentityArn"]
@property
def kinesis(self) -> KinesisStreamRecordPayload:
"""Underlying Kinesis record associated with the event"""
return KinesisStreamRecordPayload(self._data)
class KinesisStreamEvent(EventWrapper):
"""Kinesis stream event
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
"""
@property
def records(self) -> Iterator[KinesisStreamRecord]:
for record in self["Records"]:
yield KinesisStreamRecord(record)
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())