Skip to content

add dlt example pipelines for audit logs etl #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions audit_logs/DLT/01_raw_bronze_silver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Databricks notebook source
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json, time


# retrieve the location where logs are delivered
log_bucket = spark.conf.get("mypipeline.log_bucket")

@dlt.table(
comment="The raw audit logs, ingested from the s3 location configured with Databricks audit log configuration",
table_properties={"quality":"bronze"},
partition_cols = [ 'date' ]
)
def bronze():
return (spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaHints", "workspaceId long")
.load(log_bucket)
)

# COMMAND ----------

@udf(StringType())
def strip_null_udf(raw):
return json.dumps({i: raw.asDict()[i] for i in raw.asDict() if raw.asDict()[i] != None})

@dlt.table(
comment="Audit logs cleaned and prepared for analysis. Strip out all empty keys for every record, parse email address from a nested field and parse UNIX epoch to UTC timestamp.",
table_properties={"quality":"silver"},
partition_cols = [ 'date' ]
)
def silver():
return (
dlt.read_stream("bronze")
.withColumn("flattened", strip_null_udf("requestParams"))
.withColumn("email", col("userIdentity.email"))
.withColumn("date_time", from_utc_timestamp(from_unixtime(col("timestamp")/1000), "UTC"))
.drop("requestParams")
.drop("userIdentity")
)

# COMMAND ----------

@dlt.table(
comment="Verify bronze & silver tables match"
)
@dlt.expect_or_fail("no_rows_dropped", "silver_count == bronze_count")
def bronze_silver_verification():
return spark.sql("""SELECT * FROM
(SELECT COUNT(*) AS bronze_count FROM LIVE.bronze),
(SELECT COUNT(*) AS silver_count FROM LIVE.silver)""")

# COMMAND ----------

# these udfs is used to calculate the super-schema based on all events of a given service

@udf(StringType())
def just_keys_udf(string):
return [i for i in json.loads(string).keys()]

@udf(StringType())
def extract_schema_udf(keys):

schema = StructType()

keysList = [i[1:-1].split(", ") for i in keys]

keysDistinct = {key for keys in keysList for key in keys if key != ""}

if len(keysDistinct) == 0:
schema.add(StructField("placeholder", StringType()))
else:
for key in keysDistinct:
schema.add(StructField(key, StringType()))

return schema.json()

@dlt.table(
comment="List of services and their corresponding super-schema"
)
def silver_services_schema():

return (dlt.read("silver")
.select('serviceName', just_keys_udf(col("flattened")).alias("keys"))
.groupBy('serviceName').agg(extract_schema_udf(collect_set("keys")).alias("schema"))
)
127 changes: 127 additions & 0 deletions audit_logs/DLT/01_raw_bronze_silver_event_hub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Databricks notebook source
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json, time

# retrieve Shared Access Key from secret scope
connSharedAccessKey = dbutils.secrets.get(spark.conf.get("mypipeline.secret_scope_name"), spark.conf.get("mypipeline.secret_name"))

# Name of Eventhubs namespace
eh_ns_name = spark.conf.get("mypipeline.eh_ns_name")
eh_topic_name = spark.conf.get("mypipeline.eh_topic_name")

# event hub configuration
BOOTSTRAP_SERVERS = f"{eh_ns_name}.servicebus.windows.net:9093"
EH_SASL = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connSharedAccessKey}";'

@dlt.table(
comment="The raw audit logs, ingested from the event hub configured with Databricks audit log configuration",
table_properties={"quality":"bronze"},
partition_cols = [ 'date' ]
)
def bronze():
return (spark.readStream
.format("kafka")
.option("subscribe", eh_topic_name)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100000)
.load()
.withColumn("deserializedBody", col("value").cast("string"))
.withColumn("date", col("timestamp").cast("date"))
.drop("value")
)

# COMMAND ----------

@udf(StringType())
def strip_null_udf(raw):
return json.dumps({i: raw.asDict()[i] for i in raw.asDict() if raw.asDict()[i] != None})

raw_schema = (StructType([
StructField("records", ArrayType(StructType([
StructField("Host", StringType()),
StructField("category", StringType()),
StructField("identity", StringType()),
StructField("operationName", StringType()),
StructField("operationVersion", StringType()),
StructField("properties", StructType([
StructField("actionName", StringType()),
StructField("logId", StringType()),
StructField("requestId", StringType()),
StructField("requestParams", StringType()),
StructField("response", StringType()),
StructField("serviceName", StringType()),
StructField("sessionId", StringType()),
StructField("sourceIPAddress", StringType()),
StructField("userAgent", StringType())])),
StructField("resourceId", StringType()),
StructField("time", StringType()),
])))
]))

@dlt.table(
comment="Audit logs cleaned and prepared for analysis. Strip out all empty keys for every record, parse email address from a nested field and parse UNIX epoch to UTC timestamp.",
table_properties={"quality":"silver"},
partition_cols = [ 'date' ]
)
def silver():
return (
dlt.read_stream("bronze")
.select("deserializedBody")
.withColumn("parsedBody", from_json("deserializedBody", raw_schema))
.select(explode("parsedBody.records").alias("streamRecord"))
.selectExpr("streamRecord.*")
.withColumn("version", col("operationVersion"))
.withColumn("date_time", col("time").cast("timestamp"))
.withColumn("timestamp", unix_timestamp(col("date_time")) * 1000)
.withColumn("date", col("time").cast("date"))
.select("category", "version", "timestamp", "date_time", "date", "properties", col("identity").alias("userIdentity"))
.selectExpr("*", "properties.*")
.withColumnRenamed("requestParams", "flattened")
.withColumn("identity", from_json("userIdentity", "email STRING, subjectName STRING"))
.withColumn("response", from_json("response", "errorMessage STRING,result STRING,statusCode BIGINT"))
.drop("properties", "userIdentity")
)

# COMMAND ----------

# these udfs is used to calculate the super-schema based on all events of a given service

@udf(StringType())
def just_keys_udf(string):
return [i for i in json.loads(string).keys()]

@udf(StringType())
def extract_schema_udf(keys):

schema = StructType()

keysList = [i[1:-1].split(", ") for i in keys]

keysDistinct = {key for keys in keysList for key in keys if key != ""}

if len(keysDistinct) == 0:
schema.add(StructField("placeholder", StringType()))
else:
for key in keysDistinct:
schema.add(StructField(key, StringType()))

return schema.json()

@dlt.table(
comment="List of services and their corresponding super-schema"
)
def silver_services_schema():

return (dlt.read("silver")
.select('serviceName', just_keys_udf(col("flattened")).alias("keys"))
.groupBy('serviceName').agg(extract_schema_udf(collect_set("keys")).alias("schema"))
)
47 changes: 47 additions & 0 deletions audit_logs/DLT/02_silver_fan_gold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Databricks notebook source
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

# retrieve the full table name to match the silver tables the first pipeline writes out to
services_table = spark.conf.get("mypipeline.services_table")
silver_table = spark.conf.get("mypipeline.silver_table")

# COMMAND ----------

# retrieve the list of services
def get_services():
return [s[0] for s in spark.table(services_table).collect()]

# retrieve the schema for the service
def get_schema(service_name):
return (spark.table(services_table)
.filter(col("serviceName") == service_name)
.select("schema")
.collect()[0][0]
)

# this function will generate a gold table for the specified service by filtering from the silver table
def generate_gold_tables(service):
# sanitising the table name
service_name = service.replace("-","_")
schema = StructType.fromJson(json.loads(get_schema(service)))

@dlt.table(
name=service_name,
comment=f"Gold table for {service_name}",
table_properties={"quality":"gold"},
partition_cols = [ 'date' ]
)
def create_gold_table():
return (spark.readStream
.table(silver_table)
.filter(col("serviceName") == service)
.withColumn("requestParams", from_json(col("flattened"), schema))
.drop("flattened")
)

# programmatically generate gold tables, for each service in the silver table
for service in get_services():
generate_gold_tables(service)
15 changes: 15 additions & 0 deletions audit_logs/DLT/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## DLT pipelines for audit log processing

Currently, Databricks delivers audit logs for all enabled workspaces as per delivery SLA in JSON format to a customer-owned S3 bucket on AWS, or to a specified Event Hub on Azure. These audit logs contain events for specific actions related to primary resources like clusters, jobs, and the workspace. To simplify delivery and further analysis by the customers, Databricks logs each event for every action as a separate record and stores all the relevant parameters into a sparse StructType called requestParams.

In order to make this information more accessible, we recommend an ETL process based on [Delta Live Tables](https://databricks.com/product/delta-live-tables)

Our ETL process requires 2 DLT pipelines:

- The first pipeline (`01-raw-bronze-silver` on AWS and `01-raw-bronze-silver-event-hub` on Azure) does the following:
- On AWS, stream from the raw JSON files that Databricks delivers using Autoloader to a bronze Delta Lake table. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables.
- On Azure, stream from the Event Hub that Databricks delivers audit logs to. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables.
- Stream from a bronze Delta Lake table to a silver Delta Lake table such that it takes the sparse requestParams StructType and strips out all empty keys for every record, along with performing some other basic transformations like parsing email address from a nested field and parsing UNIX epoch to UTC timestamp.
- The second pipeline (`02-silver-fan-gold`) then streams to individual gold Delta Lake tables for each Databricks service tracked in the audit logs. As the list of service is dynamically inferred from the data in the silver table, we need to create a separate pipeline due to current limitation

The 2 pipelines are then linked together in a multi-task job, to be executed on a regular schedule
1 change: 0 additions & 1 deletion audit_logs/azure_eh_audit_logs_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ def flatten_table(service):
else:
for key in keysDistinct:
schema.add(StructField(key, StringType()))

# write the df with the correct schema to table
(flattenedStream
.filter(col("serviceName") == service_name)
Expand Down