Skip to content

Commit

Permalink
Merge pull request #541 from rtdip/develop
Browse files Browse the repository at this point in the history
v0.8.5
  • Loading branch information
GBBBAS authored Oct 13, 2023
2 parents 77ce115 + ce66449 commit 60f2fc1
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 5 deletions.
8 changes: 7 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"GitHub.copilot",
"GitHub.codespaces",
"ryanluker.vscode-coverage-gutters",
"ms-python.black-formatter"
"ms-python.black-formatter",
"SonarSource.sonarlint-vscode"
],
"settings": {
"azureFunctions.scmDoBuildDuringDeployment": true,
Expand All @@ -32,12 +33,17 @@
"PYTHONPATH": "${workspaceFolder}:${env:PYTHONPATH}"
},
"git.alwaysSignOff": true,
"git.pruneOnFetch": true,
"githubPullRequests.ignoredPullRequestBranches": [
"develop"
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
},
"sonarlint.connectedMode.project": {
"connectionId": "rtdip",
"projectKey": "rtdip_core"
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"PYTHONPATH": "${workspaceFolder};${env:PYTHONPATH}"
},
"git.alwaysSignOff": true,
"git.pruneOnFetch": true,
"githubPullRequests.ignoredPullRequestBranches": [
"develop"
],
Expand Down Expand Up @@ -63,5 +64,9 @@
"TURBODBCSQL",
"ZORDER"
],
"licenser.projectName": "RTDIP"
"licenser.projectName": "RTDIP",
"sonarlint.connectedMode.project": {
"connectionId": "rtdip",
"projectKey": "rtdip_core"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: src.sdk.python.rtdip_sdk.pipelines.utilities.azure.autoloader_resources
1 change: 1 addition & 0 deletions docs/sdk/pipelines/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Utilities are components that perform utility functions such as logging, error h
|[Delta Table Vacuum](../code-reference/pipelines/utilities/spark/delta_table_vacuum.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[AWS S3 Bucket Policy](../code-reference/pipelines/utilities/aws/s3_bucket_policy.md)|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[ADLS Gen 2 ACLs](../code-reference/pipelines/utilities/azure/adls_gen2_acl.md)|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Azure Autoloader Resources](../code-reference/pipelines/utilities/azure/autoloader_resources.md)|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Spark ADLS Gen 2 Service Principal Connect](../code-reference/pipelines/utilities/spark/adls_gen2_spn_connect.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|

!!! note "Note"
Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ channels:
- defaults
dependencies:
- python>=3.8,<3.12
# - mkdocs-material==9.1.21
- mkdocs-material-extensions==1.1.1
- jinja2==3.1.2
- pytest==7.4.0
Expand All @@ -35,6 +34,7 @@ dependencies:
- azure-identity==1.12.0
- azure-storage-file-datalake==12.12.0
- azure-keyvault-secrets==4.7.0
- azure-mgmt-storage==21.0.0
- boto3==1.28.2
- pyodbc==4.0.39
- fastapi==0.103.2
Expand Down Expand Up @@ -70,6 +70,7 @@ dependencies:
- pip:
- dependency-injector==4.41.0
- azure-functions==1.15.0
- azure-mgmt-eventgrid==10.2.0
- nest_asyncio==1.5.6
- hvac==1.1.1
- langchain==0.0.291
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ nav:
- S3 Bucket Policy: sdk/code-reference/pipelines/utilities/aws/s3_bucket_policy.md
- Azure:
- ADLS Gen 2 ACLs: sdk/code-reference/pipelines/utilities/azure/adls_gen2_acl.md
- Autoloader Resources: sdk/code-reference/pipelines/utilities/azure/autoloader_resources.md
- Converters:
- Json: sdk/code-reference/pipelines/converters/pipeline_job_json.md
- Secrets:
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
"databricks-sdk==0.9.0",
"pydantic==2.4.2",
"azure-storage-file-datalake==12.12.0",
"azure-mgmt-storage==21.0.0",
"azure-mgmt-eventgrid==10.2.0",
"boto3==1.28.2",
"hvac==1.1.1",
"azure-keyvault-secrets==4.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def get_default_package(package_name):
"azure_key_vault_secret": PyPiLibrary(
name="azure-keyvault-secrets", version="4.7.0"
),
"azure_storage_mgmt": PyPiLibrary(name="azure-mgmt-storage", version="21.0.0"),
"azure_eventgrid_mgmt": PyPiLibrary(
name="azure-mgmt-eventgrid", version="10.2.0"
),
"aws_boto3": PyPiLibrary(name="boto3", version="1.28.2"),
"hashicorp_vault": PyPiLibrary(name="hvac", version="1.1.0"),
"api_requests": PyPiLibrary(name="requests", version="2.30.0"),
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/pipelines/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
from .spark.configuration import *
from .spark.adls_gen2_spn_connect import *
from .azure.adls_gen2_acl import *
from .azure.autoloader_resources import *
from .spark.session import *
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ADLSGen2DirectoryACLUtility(UtilitiesInterface):
storage_account (str): ADLS Gen 2 Storage Account Name
container (str): ADLS Gen 2 Container Name
credential (TokenCredential): Credentials to authenticate with ADLS Gen 2 Storage Account
directory (str): Directory to be assign ACLS to in an ADLSS Gen 2
directory (str): Directory to be assign ACLS to in an ADLS Gen 2
group_object_id (str): Azure AD Group Object ID to be assigned to Directory
folder_permissions (optional, str): Folder Permissions to Assign to directory
parent_folder_permissions (optional, str): Folder Permissions to Assign to parent directories. Parent Folder ACLs not set if None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright 2023 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..interfaces import UtilitiesInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.constants import get_default_package

from azure.core.credentials import TokenCredential
from azure.mgmt.eventgrid import EventGridManagementClient
from azure.mgmt.eventgrid.models import (
EventSubscription,
EventSubscriptionFilter,
StorageQueueEventSubscriptionDestination,
EventDeliverySchema,
StringContainsAdvancedFilter,
RetryPolicy,
SystemTopic,
)
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import StorageQueue


class AzureAutoloaderResourcesUtility(UtilitiesInterface):
"""
Creates the required Azure Resources for the Databricks Autoloader Notification Mode
Args:
subscription_id (str): Azure Subscription ID
resource_group_name (str): Resource Group Name of Subscription
storage_account (str): Storage Account Name
container (str): Container Name
directory (str): Directory to be used for filtering messages in the Event Subscription. This will be equivalent to the Databricks Autoloader Path
credential (TokenCredential): Credentials to authenticate with Storage Account
event_subscription_name (str): Name of the Event Subscription
queue_name (str): Name of the queue that will be used for the Endpoint of the Messages
system_topic_name (optional, str): The system topic name. Defaults to the storage account name if not provided.
"""

subscription_id: str
resource_group_name: str
storage_account: str
container: str
directory: str
credential: TokenCredential
event_subscription_name: str
queue_name: str
system_topic_name: str

def __init__(
self,
subscription_id: str,
resource_group_name: str,
storage_account: str,
container: str,
directory: str,
credential: TokenCredential,
event_subscription_name: str,
queue_name: str,
system_topic_name: str = None,
) -> None:
self.subscription_id = subscription_id
self.resource_group_name = resource_group_name
self.storage_account = storage_account
self.container = container
self.directory = directory
self.credential = credential
self.event_subscription_name = event_subscription_name
self.queue_name = queue_name
self.system_topic_name = (
storage_account if system_topic_name is None else system_topic_name
)

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYTHON
"""
return SystemType.PYTHON

@staticmethod
def libraries():
libraries = Libraries()
libraries.add_pypi_library(get_default_package("azure_eventgrid_mgmt"))
libraries.add_pypi_library(get_default_package("azure_storage_mgmt"))
return libraries

@staticmethod
def settings() -> dict:
return {}

def execute(self) -> bool:
storage_mgmt_client = StorageManagementClient(
credential=self.credential, subscription_id=self.subscription_id
)

account_properties = storage_mgmt_client.storage_accounts.get_properties(
resource_group_name=self.resource_group_name,
account_name=self.storage_account,
)

queue_response = storage_mgmt_client.queue.list(
resource_group_name=self.resource_group_name,
account_name=self.storage_account,
)

queue_list = [
queue for queue in queue_response if queue.name == self.queue_name
]

if queue_list == []:
storage_mgmt_client.queue.create(
resource_group_name=self.resource_group_name,
account_name=self.storage_account,
queue_name=self.queue_name,
queue=StorageQueue(),
)

eventgrid_client = EventGridManagementClient(
credential=self.credential, subscription_id=self.subscription_id
)

system_topic_response = eventgrid_client.system_topics.list_by_resource_group(
resource_group_name=self.resource_group_name,
filter="name eq '{}'".format(self.system_topic_name),
)

source = "/subscriptions/{}/resourceGroups/{}/providers/Microsoft.Storage/StorageAccounts/{}".format(
self.subscription_id, self.resource_group_name, self.storage_account
)

system_topic_list = [
system_topic
for system_topic in system_topic_response
if system_topic.source == source
]

if system_topic_list == []:
eventgrid_client.system_topics.begin_create_or_update(
resource_group_name=self.resource_group_name,
system_topic_name=self.system_topic_name,
system_topic_info=SystemTopic(
location=account_properties.location,
source=source,
topic_type="Microsoft.Storage.StorageAccounts",
),
).result()

system_topic_event_subscription_response = (
eventgrid_client.system_topic_event_subscriptions.list_by_system_topic(
resource_group_name=self.resource_group_name,
system_topic_name=self.system_topic_name,
filter="name eq '{}'".format(self.event_subscription_name),
)
)

system_topic_event_subscription_list = [
system_topic_event_subscription
for system_topic_event_subscription in system_topic_event_subscription_response
if system_topic_event_subscription.source == source
]

if system_topic_event_subscription_list == []:
event_subscription_destination = StorageQueueEventSubscriptionDestination(
resource_id=source,
queue_name=self.queue_name,
queue_message_time_to_live_in_seconds=None,
)

event_subscription_filter = EventSubscriptionFilter(
subject_begins_with="/blobServices/default/containers/{}/blobs/{}".format(
self.container, self.directory
),
included_event_types=[
"Microsoft.Storage.BlobCreated",
"Microsoft.Storage.BlobRenamed",
"Microsoft.Storage.DirectoryRenamed",
],
advanced_filters=[
StringContainsAdvancedFilter(
key="data.api",
values=[
"CopyBlob",
"PutBlob",
"PutBlockList",
"FlushWithClose",
"RenameFile",
"RenameDirectory",
],
)
],
)

retry_policy = RetryPolicy()

event_subscription_info = EventSubscription(
destination=event_subscription_destination,
filter=event_subscription_filter,
event_delivery_schema=EventDeliverySchema.EVENT_GRID_SCHEMA,
retry_policy=retry_policy,
)

eventgrid_client.system_topic_event_subscriptions.begin_create_or_update(
resource_group_name=self.resource_group_name,
system_topic_name=self.system_topic_name,
event_subscription_name=self.event_subscription_name,
event_subscription_info=event_subscription_info,
).result()
return True
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,41 @@
sys.path.insert(0, ".")
from pytest_mock import MockerFixture

from src.sdk.python.rtdip_sdk.pipelines.utilities.azure.adls_gen2_acl import (
from src.sdk.python.rtdip_sdk.pipelines.utilities import (
ADLSGen2DirectoryACLUtility,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.constants import (
get_default_package,
)
from tests.sdk.python.rtdip_sdk.pipelines._pipeline_utils.azure import (
MockDataLakeServiceClient,
)


def test_adls_gen2_acl_utility_setup():
adls_gen2_acl_utility = ADLSGen2DirectoryACLUtility(
storage_account="test_storage_account",
container="test_container",
credential="test_credential",
directory="/test/directory",
group_object_id="test_group_object_id",
folder_permissions="rwx",
)

assert adls_gen2_acl_utility.system_type().value == 1
assert adls_gen2_acl_utility.libraries() == Libraries(
maven_libraries=[],
pypi_libraries=[
get_default_package("azure_adls_gen_2"),
],
pythonwheel_libraries=[],
)
assert isinstance(adls_gen2_acl_utility.settings(), dict)


def test_adls_gen2_acl_multi_folder(mocker: MockerFixture):
adls_gen2_acl_utility = ADLSGen2DirectoryACLUtility(
storage_account="test_storage_account",
Expand Down
Loading

0 comments on commit 60f2fc1

Please sign in to comment.