Skip to content

Commit 4c20cc3

Browse files
authored
Merge pull request #515 from rtdip/develop
v0.7.11
2 parents b489c26 + e02d574 commit 4c20cc3

34 files changed

+895
-229
lines changed

.github/workflows/test.yml

+5-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
matrix:
2828
os: [ubuntu-latest]
2929
python-version: ["3.8", "3.9", "3.10", "3.11"]
30-
pyspark: ["3.3.0", "3.3.1", "3.3.2", "3.4.0"]
30+
pyspark: ["3.3.0", "3.3.1", "3.3.2", "3.4.0", "3.4.1"]
3131
exclude:
3232
- pyspark: "3.4.0"
3333
python-version: "3.8"
@@ -49,7 +49,9 @@ jobs:
4949
- pyspark: "3.3.2"
5050
delta-spark: "2.3.0"
5151
- pyspark: "3.4.0"
52-
delta-spark: "2.4.0"
52+
delta-spark: "2.4.0"
53+
- pyspark: "3.4.1"
54+
delta-spark: "2.4.0"
5355
runs-on: ${{ matrix.os }}
5456
steps:
5557
- uses: actions/checkout@v3
@@ -132,7 +134,7 @@ jobs:
132134
run: |
133135
mkdocs build --strict
134136
135-
job_python_lint_black:
137+
job_lint_python_black:
136138
runs-on: ubuntu-latest
137139
steps:
138140
- uses: actions/checkout@v3

docs/blog/.authors.yml

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
GBBBAS:
16-
name: Bryce Bartmann
17-
description: Contributor
18-
avatar: https://github.com/GBBBAS.png
15+
authors:
16+
GBBBAS:
17+
name: Bryce Bartmann
18+
description: Contributor
19+
avatar: https://github.com/GBBBAS.png
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Write Process Control Data Model Latest Values to Delta
2+
::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_latest_to_delta
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# Write to Delta
1+
# Write Process Control Data Model to Delta
22
::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_to_delta

docs/sdk/code-reference/pipelines/transformers/spark/mqtt_json_to_pcdm.md

-2
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Convert SEM Json to Process Control Data Model
2+
::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.sem_json_to_pcdm

docs/sdk/pipelines/components.md

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ Destinations are components that connect to sink/destination systems and write d
8383
|[Kinesis](../code-reference/pipelines/destinations/spark/kafka.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
8484
|[Rest API](../code-reference/pipelines/destinations/spark/rest_api.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
8585
|[Process Control Data Model To Delta](../code-reference/pipelines/destinations/spark/pcdm_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
86+
|[Process Control Data Model Latest Values To Delta](../code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
8687
|[EVM](../code-reference/pipelines/destinations/blockchain/evm.md)|:heavy_check_mark:|||:heavy_check_mark:|:heavy_check_mark:|
8788

8889
!!! note "Note"

environment.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ dependencies:
4040
- fastapi==0.100.1
4141
- httpx==0.24.1
4242
- trio==0.22.1
43-
- pyspark>=3.3.0,<3.5.0
43+
- pyspark>=3.3.0,<3.6.0
4444
- delta-spark>=2.2.0,<3.1.0
4545
- grpcio>=1.48.1
4646
- grpcio-status>=1.48.1
4747
- googleapis-common-protos>=1.56.4
4848
- openjdk==11.0.15
49-
- openai==0.27.8
49+
- openai==0.27.8
50+
- mkdocs-material==9.3.1
5051
- mkdocstrings==0.22.0
5152
- mkdocstrings-python==1.4.0
5253
- mkdocs-macros-plugin==1.0.1
@@ -60,7 +61,7 @@ dependencies:
6061
- strawberry-graphql[fastapi,pydantic]==0.194.4
6162
- web3==6.5.0
6263
- twine==4.0.2
63-
- delta-sharing-python==0.7.4
64+
- delta-sharing-python==1.0.0
6465
- polars==0.18.8
6566
- moto[s3]==4.1.14
6667
- xarray>=2023.1.0,<2023.8.0
@@ -75,5 +76,4 @@ dependencies:
7576
- langchain==0.0.291
7677
- build==0.10.0
7778
- deltalake==0.10.1
78-
- mkdocs-material==9.2.0b3
7979

mkdocs.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ nav:
184184
- OPC Publisher Json To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.md
185185
- Fledge Json To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md
186186
- EdgeX JSON data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.md
187-
- MQTT data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/mqtt_json_to_pcdm.md
187+
- SEM data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/sem_json_to_pcdm.md
188188
- SSIP PI Binary File data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.md
189189
- SSIP PI Binary JSON data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/ssip_pi_binary_json_to_pcdm.md
190190
- Pandas to PySpark DataFrame Conversion: sdk/code-reference/pipelines/transformers/spark/pandas_to_pyspark.md
@@ -209,7 +209,8 @@ nav:
209209
- Kafka: sdk/code-reference/pipelines/destinations/spark/kafka.md
210210
- Kinesis: sdk/code-reference/pipelines/destinations/spark/kinesis.md
211211
- Rest API: sdk/code-reference/pipelines/destinations/spark/rest_api.md
212-
- Process Control Data Model To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md
212+
- PCDM To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md
213+
- PCDM Latest To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md
213214
- Python:
214215
- Delta: sdk/code-reference/pipelines/destinations/python/delta.md
215216
- Blockchain:

setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
]
4545

4646
PYSPARK_PACKAGES = [
47-
"pyspark>=3.3.0,<3.5.0",
47+
"pyspark>=3.3.0,<3.6.0",
4848
"delta-spark>=2.2.0,<2.5.0",
4949
]
5050

@@ -58,7 +58,7 @@
5858
"azure-keyvault-secrets==4.7.0",
5959
"web3==6.5.0",
6060
"polars[deltalake]==0.18.8",
61-
"delta-sharing==0.7.4",
61+
"delta-sharing==1.0.0",
6262
"xarray>=2023.1.0,<2023.8.0",
6363
"ecmwf-api-client==1.6.3",
6464
"netCDF4==1.6.4",

src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/constants.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def get_default_package(package_name):
3535
version=_get_package_version("delta-spark"),
3636
),
3737
"spark_delta_sharing": MavenLibrary(
38-
group_id="io.delta", artifact_id="delta-sharing-spark_2.12", version="0.6.3"
38+
group_id="io.delta", artifact_id="delta-sharing-spark_2.12", version="1.0.0"
3939
),
4040
"spark_azure_eventhub": MavenLibrary(
4141
group_id="com.microsoft.azure",
@@ -52,14 +52,13 @@ def get_default_package(package_name):
5252
artifact_id="spark-connect_2.12",
5353
version=_get_package_version("pyspark"),
5454
),
55-
"rtdip_sdk": PyPiLibrary(name="rtdip_sdk", version="0.5.1"),
5655
"azure_adls_gen_2": PyPiLibrary(
57-
name="azure-storage-file-datalake", version="12.10.1"
56+
name="azure-storage-file-datalake", version="12.12.0"
5857
),
5958
"azure_key_vault_secret": PyPiLibrary(
6059
name="azure-keyvault-secrets", version="4.7.0"
6160
),
62-
"aws_boto3": PyPiLibrary(name="boto3", version="1.26.118"),
61+
"aws_boto3": PyPiLibrary(name="boto3", version="1.28.2"),
6362
"hashicorp_vault": PyPiLibrary(name="hvac", version="1.1.0"),
6463
"api_requests": PyPiLibrary(name="requests", version="2.30.0"),
6564
"pyarrow": PyPiLibrary(name="pyarrow", version="12.0.0"),

src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/obc_field_mappings.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
# limitations under the License.
1414

1515
OBC_FIELD_MAPPINGS_V10 = {
16-
"0": {"TagName": "obc_timeStamp", "ValueType": "double"},
16+
"0": {"TagName": "obc_timeStamp", "ValueType": "float"},
1717
"1": {"TagName": "obc_errorState", "ValueType": "integer"},
1818
"2": {"TagName": "obc_energyThreshold", "ValueType": "float"},
1919
"3": {"TagName": "obc_energyLevel", "ValueType": "float"},
2020
"4": {"TagName": "gps_errorState", "ValueType": "integer"},
2121
"5": {"TagName": "gps_utcTime", "ValueType": "string"},
22-
"6": {"TagName": "gps_latitude", "ValueType": "double"},
23-
"7": {"TagName": "gps_longitude", "ValueType": "double"},
22+
"6": {"TagName": "gps_latitude", "ValueType": "float"},
23+
"7": {"TagName": "gps_longitude", "ValueType": "float"},
2424
"8": {"TagName": "gps_fixQuality", "ValueType": "integer"},
2525
"9": {"TagName": "gps_dilution", "ValueType": "float"},
2626
"10": {"TagName": "gps_speed", "ValueType": "float"},
@@ -52,8 +52,8 @@
5252
"36": {"TagName": "sup_VoltageIn", "ValueType": "integer"},
5353
"37": {"TagName": "all_msSinceReset", "ValueType": "integer"},
5454
"38": {"TagName": "iot_errorState", "ValueType": "integer"},
55-
"39": {"TagName": "iot_route", "ValueType": "text"},
56-
"40": {"TagName": "cell_mode", "ValueType": "text"},
55+
"39": {"TagName": "iot_route", "ValueType": "integer"},
56+
"40": {"TagName": "cell_mode", "ValueType": "string"},
5757
"41": {"TagName": "cell_rssi", "ValueType": "integer"},
5858
"42": {"TagName": "wifi_power", "ValueType": "integer"},
5959
"43": {"TagName": "message_id", "ValueType": "string"},

src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py

+52-23
Original file line numberDiff line numberDiff line change
@@ -252,36 +252,65 @@ def get_dbutils(
252252

253253
APM_SCHEMA = StructType(
254254
[
255-
StructField("Id", StringType(), True),
256-
StructField("TenantId", StringType(), True),
257-
StructField("IdType", StringType(), True),
258255
StructField(
259-
"Samples",
256+
"SystemTimeSeries",
257+
StructType(
258+
[
259+
StructField("Id", StringType(), True),
260+
StructField("TenantId", StringType(), True),
261+
StructField("IdType", StringType(), True),
262+
StructField(
263+
"Samples",
264+
ArrayType(
265+
StructType(
266+
[
267+
StructField("ItemName", StringType(), True),
268+
StructField("Time", StringType(), True),
269+
StructField("Value", StringType(), True),
270+
StructField("Unit", StringType(), True),
271+
StructField(
272+
"NormalizedQuality", StringType(), True
273+
),
274+
StructField("HighValue", DoubleType(), True),
275+
StructField("LowValue", DoubleType(), True),
276+
StructField("TargetValue", DoubleType(), True),
277+
]
278+
)
279+
),
280+
True,
281+
),
282+
]
283+
),
284+
True,
285+
)
286+
]
287+
)
288+
289+
SEM_SCHEMA = StructType(
290+
[
291+
StructField("apiVersion", StringType(), True),
292+
StructField("deviceName", StringType(), True),
293+
StructField("id", StringType(), True),
294+
StructField("origin", LongType(), True),
295+
StructField("profileName", StringType(), True),
296+
StructField(
297+
"readings",
260298
ArrayType(
261299
StructType(
262300
[
263-
StructField("ItemName", StringType(), True),
264-
StructField("Time", StringType(), True),
265-
StructField("Value", StringType(), True),
266-
StructField("Unit", StringType(), True),
267-
StructField("NormalizedQuality", StringType(), True),
268-
StructField("HighValue", DoubleType(), True),
269-
StructField("LowValue", DoubleType(), True),
270-
StructField("TargetValue", DoubleType(), True),
301+
StructField("deviceName", StringType(), True),
302+
StructField("id", StringType(), True),
303+
StructField("origin", LongType(), True),
304+
StructField("profileName", StringType(), True),
305+
StructField("resourceName", StringType(), True),
306+
StructField("value", StringType(), True),
307+
StructField("valueType", StringType(), True),
271308
]
272-
)
309+
),
310+
True,
273311
),
274312
True,
275313
),
276-
]
277-
)
278-
279-
MQTT_SCHEMA = StructType(
280-
[
281-
StructField("d", ArrayType(StringType(), True), True),
282-
StructField("dID", StringType(), True),
283-
StructField("m", StringType(), True),
284-
StructField("t", LongType(), True),
285-
StructField("v", StringType(), True),
314+
StructField("sourceName", StringType(), True),
286315
]
287316
)

src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from .spark.kinesis import *
2020
from .spark.rest_api import *
2121
from .spark.pcdm_to_delta import *
22+
from .spark.pcdm_latest_to_delta import *
2223
from .spark.kafka_eventhub import *
2324
from .blockchain.evm import *
2425
from .python.delta import *

src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py

+14-9
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ class SparkDeltaDestination(DestinationInterface):
3030
data (DataFrame): Dataframe to be written to Delta
3131
options (dict): Options that can be specified for a Delta Table write operation (See Attributes table below). Further information on the options is available for [batch](https://docs.delta.io/latest/delta-batch.html#write-to-a-table){ target="_blank" } and [streaming](https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink){ target="_blank" }.
3232
destination (str): Either the name of the Hive Metastore or Unity Catalog Delta Table **or** the path to the Delta table
33-
mode (str): Method of writing to Delta Table - append/overwrite (batch), append/complete (stream)
34-
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
35-
query_name (str): Unique name for the query in associated SparkSession
33+
mode (optional str): Method of writing to Delta Table - append/overwrite (batch), append/update/complete (stream). Default is append
34+
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
35+
query_name (optional str): Unique name for the query in associated SparkSession. (stream) Default is DeltaDestination
36+
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
3637
3738
Attributes:
3839
checkpointLocation (str): Path to checkpoint files. (Streaming)
@@ -50,22 +51,25 @@ class SparkDeltaDestination(DestinationInterface):
5051
mode: str
5152
trigger: str
5253
query_name: str
54+
query_wait_interval: int
5355

5456
def __init__(
5557
self,
5658
data: DataFrame,
5759
options: dict,
5860
destination: str,
5961
mode: str = "append",
60-
trigger="10 seconds",
61-
query_name="DeltaDestination",
62+
trigger: str = "10 seconds",
63+
query_name: str = "DeltaDestination",
64+
query_wait_interval: int = None,
6265
) -> None:
6366
self.data = data
6467
self.options = options
6568
self.destination = destination
6669
self.mode = mode
6770
self.trigger = trigger
6871
self.query_name = query_name
72+
self.query_wait_interval = query_wait_interval
6973

7074
@staticmethod
7175
def system_type():
@@ -150,10 +154,11 @@ def write_stream(self):
150154
.toTable(self.destination)
151155
)
152156

153-
while query.isActive:
154-
if query.lastProgress:
155-
logging.info(query.lastProgress)
156-
time.sleep(10)
157+
if self.query_wait_interval:
158+
while query.isActive:
159+
if query.lastProgress:
160+
logging.info(query.lastProgress)
161+
time.sleep(self.query_wait_interval)
157162

158163
except Py4JJavaError as e:
159164
logging.exception(e.errmsg)

0 commit comments

Comments
 (0)