Skip to content

Commit

Permalink
Merge pull request #416 from rtdip/develop
Browse files Browse the repository at this point in the history
v0.6.3
  • Loading branch information
GBBBAS authored Aug 4, 2023
2 parents 0e00f5c + 9868736 commit a42f15c
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 10 deletions.
1 change: 1 addition & 0 deletions .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
outputs:
pr_info: ${{ steps.pr.outputs.result }}
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- name: 'Download Artifact'
uses: actions/github-script@v6
Expand Down
8 changes: 4 additions & 4 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ dependencies:
- strawberry-graphql[fastapi,pydantic]==0.194.4
- web3==6.5.0
- twine==4.0.2
- delta-sharing-python==0.7.4
- polars==0.18.8
- moto[s3]==4.1.14
- pip:
- dependency-injector==4.41.0
- azure-functions==1.15.0
- nest_asyncio==1.5.6
- hvac==1.1.1
- langchain==0.0.247
- deltalake==0.10.0
- moto[s3]==4.1.13
- build==0.10.0
- polars==0.18.8
- delta-sharing==0.7.3
- deltalake==0.10.1

4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
"boto3==1.28.2",
"hvac==1.1.1",
"azure-keyvault-secrets==4.7.0",
"web3==6.5.0"
"web3==6.5.0",
"polars[deltalake]==0.18.8",
"delta-sharing==0.7.4"
]

EXTRAS_DEPENDENCIES: dict[str, list[str]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
import time
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, when, date_format
from pyspark.sql.functions import col, when, date_format, floor
from py4j.protocol import Py4JJavaError

from ..interfaces import DestinationInterface
Expand Down Expand Up @@ -45,6 +45,7 @@ class SparkPCDMToDeltaDestination(DestinationInterface):
query_name (str): Unique name for the query in associated SparkSession
merge (bool): Use Delta Merge to perform inserts, updates and deletes
try_broadcast_join (bool): Attempts to perform a broadcast join in the merge which can leverage data skipping using partition pruning and file pruning automatically. Can fail if dataframe being merged is large and therefore more suitable for streaming merges than batch merges
remove_nanoseconds (bool): Removes nanoseconds from the EventTime column and replaces with zeros
remove_duplicates (bool: Removes duplicates before writing the data
Attributes:
Expand All @@ -61,6 +62,7 @@ class SparkPCDMToDeltaDestination(DestinationInterface):
query_name: str
merge: bool
try_broadcast_join: bool
remove_nanoseconds: bool
remove_duplicates: bool

def __init__(self,
Expand All @@ -75,6 +77,7 @@ def __init__(self,
query_name: str ="PCDMToDeltaMergeDestination",
merge: bool = True,
try_broadcast_join = False,
remove_nanoseconds: bool = False,
remove_duplicates: bool = True) -> None:
self.spark = spark
self.data = data
Expand All @@ -87,6 +90,7 @@ def __init__(self,
self.query_name = query_name
self.merge = merge
self.try_broadcast_join = try_broadcast_join
self.remove_nanoseconds = remove_nanoseconds
self.remove_duplicates = remove_duplicates

@staticmethod
Expand Down Expand Up @@ -189,6 +193,9 @@ def _write_data_by_type(self, df: DataFrame):
if self.merge == True:
df = df.withColumn("ChangeType", when(df["ChangeType"].isin("insert", "update"), "upsert").otherwise(df["ChangeType"]))

if self.remove_nanoseconds == True:
df = df.withColumn("EventTime", (floor(col("EventTime").cast("double")*1000)/1000).cast("timestamp"))

if self.remove_duplicates == True:
df = df.drop_duplicates(["TagName", "EventTime"])

Expand Down
4 changes: 2 additions & 2 deletions src/sdk/python/rtdip_sdk/pipelines/secrets/azure_key_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def settings() -> dict:

def _get_akv_client(self):
return SecretClient(
vault_url=self.vault,
vault_url="https://{}.vault.azure.net".format(self.vault),
credential=self.credential,
**self.kwargs
)
Expand All @@ -71,7 +71,7 @@ def get(self):
Retrieves the secret from the Azure Key Vault
'''
response = self.client.get_secret(name=self.key)
return response
return response.value

def set(self):
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def execute(self) -> Tuple[Libraries, dict]:
(issubclass(class_check, UtilitiesInterface) and class_check != UtilitiesInterface)
):
component_list.append(cls[1])
print(cls[0])

task_libraries = Libraries()
task_libraries.get_libraries_from_components(component_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from src.sdk.python.rtdip_sdk.pipelines.secrets.azure_key_vault import AzureKeyVaultSecrets
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries, PyPiLibrary

class MockSecretValue():
value = "test"

def test_azure_key_vault_secret_setup(mocker: MockerFixture):
azure_key_vault = AzureKeyVaultSecrets(vault="akv-vault-url", credential=object(), key="test")
assert azure_key_vault.system_type().value == 1
Expand All @@ -29,7 +32,7 @@ def test_azure_key_vault_secret_setup(mocker: MockerFixture):
assert isinstance(azure_key_vault.settings(), dict)

def test_azure_key_vault_secrets_get(mocker: MockerFixture):
mocker.patch("azure.keyvault.secrets.SecretClient.get_secret", return_value="test")
mocker.patch("azure.keyvault.secrets.SecretClient.get_secret", return_value=MockSecretValue())

azure_key_vault = AzureKeyVaultSecrets(vault="akv-vault-url", credential=object(), key="vault-key")

Expand Down

0 comments on commit a42f15c

Please sign in to comment.