Skip to content

Feature streaming 040824 #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.

<<<<<<< feature_streaming_040824
### Unreleased - next release is intended to be v0.4.0

#### Changed
* Minimum spark version is PySpark 3.2.1 0 which is the minimum version for Databricks Runtime 10.4 LTS
* Changed parsing of build options for data generator to support use of custom streaming
* Documentation updates in support of new features such as streaming, complex structures etc
=======
### Unreleased

#### Changed
* Updated documentation for generating text data.
>>>>>>> master


### Version 0.3.6 Post 1
Expand All @@ -18,7 +27,6 @@ All notable changes to the Databricks Labs Data Generator will be documented in
#### Fixed
* Fixed scenario where `DataAnalyzer` is used on dataframe containing a column named `summary`


### Version 0.3.6

#### Changed
Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Dependent packages are not installed automatically by the `dbldatagen` package.
The code has been tested with Python 3.8.12 and later.

Older releases were tested with Python 3.7.5 but as of this release, it requires the Databricks
runtime 9.1 LTS or later.
runtime 10.4 LTS or later, and Python 3.8.10 or later. Due to limations in the runtimes available in
Github actions, testing is performed with Python 3.8.12.

## Checking your code for common issues

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ The documentation [installation notes](https://databrickslabs.github.io/dbldatag
contains details of installation using alternative mechanisms.

## Compatibility
The Databricks Labs Data Generator framework can be used with Pyspark 3.1.2 and Python 3.8 or later. These are
The Databricks Labs Data Generator framework can be used with Pyspark 3.2.1 and Python 3.8 or later. These are
compatible with the Databricks runtime 10.4 LTS and later releases. For full Unity Catalog support,
we recommend using Databricks runtime 13.2 or later (Databricks 13.3 LTS or above preferred)

Expand Down
105 changes: 104 additions & 1 deletion dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"""
import copy
import logging
import math
import time
import re

from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType
Expand All @@ -25,8 +27,33 @@
_OLD_MIN_OPTION = 'min'
_OLD_MAX_OPTION = 'max'

_STREAMING_SOURCE_OPTION = "dbldatagen.streaming.source"
_STREAMING_SCHEMA_OPTION = "dbldatagen.streaming.sourceSchema"
_STREAMING_PATH_OPTION = "dbldatagen.streaming.sourcePath"
_STREAMING_FORMAT_OPTION = "dbldatagen.streaming.sourceFormat"
_STREAMING_TABLE_OPTION = "dbldatagen.streaming.sourceTable"
_STREAMING_ID_FIELD_OPTION = "dbldatagen.streaming.sourceIdField"
_STREAMING_TIMESTAMP_FIELD_OPTION = "dbldatagen.streaming.sourceTimestampField"
_STREAMING_GEN_TIMESTAMP_OPTION = "dbldatagen.streaming.generateTimestamp"
_STREAMING_USE_SOURCE_FIELDS = "dbldatagen.streaming.sourceFields"
_BUILD_OPTION_PREFIX = "dbldatagen."

_STREAMING_TIMESTAMP_COLUMN = "_source_timestamp"

_STREAMING_SOURCE_RATE = "rate"
_STREAMING_SOURCE_RATE_MICRO_BATCH = "rate-micro-batch"
_STREAMING_SOURCE_NUM_PARTITIONS = "numPartitions"
_STREAMING_SOURCE_ROWS_PER_BATCH = "rowsPerBatch"
_STREAMING_SOURCE_ROWS_PER_SECOND = "rowsPerSecond"
_STREAM_SOURCE_START_TIMESTAMP = "startTimestamp"

_STREAMING_SOURCE_TEXT = "text"
_STREAMING_SOURCE_PARQUET = "parquet"
_STREAMING_SOURCE_CSV = "csv"
_STREAMING_SOURCE_JSON = "json"
_STREAMING_SOURCE_ORC = "ord"
_STREAMING_SOURCE_DELTA = "delta"


class DataGenerator:
""" Main Class for test data set generation
Expand All @@ -53,6 +80,11 @@ class DataGenerator:

Note: in a shared spark session, the sparkContext is not available, so the default parallelism is set to 200.
We recommend passing an explicit value for `partitions` in this case.

Note that the number of partitions requested is not guaranteed to be supplied when generating
a streaming data set using a streaming source that generates a different number of partitions.
However for most batch use cases, the requested number of partitions will be generated.

"""

# class vars
Expand Down Expand Up @@ -955,7 +987,7 @@ def withStructColumn(self, colName, fields=None, asJson=False, **kwargs):

if asJson:
output_expr = f"to_json({struct_expr})"
newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs)
newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs)
else:
newDf = self.withColumn(colName, INFER_DATATYPE, expr=struct_expr, **kwargs)

Expand Down Expand Up @@ -1044,6 +1076,7 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):

end_id = self._rowCount + startId
id_partitions = self.partitions if self.partitions is not None else 4
build_options, passthrough_options, unsupported_options = self._parseBuildOptions(options)

if not streaming:
status = f"Generating data frame with ids from {startId} to {end_id} with {id_partitions} partitions"
Expand Down Expand Up @@ -1233,6 +1266,76 @@ def computeBuildPlan(self):
self.buildPlanComputed = True
return self

def _parseBuildOptions(self, options):
""" Parse build options
Parse build options into tuple of dictionaries - (datagen options, passthrough options, unsupported options)
where
- `datagen options` is dictionary of options to be interpreted by the data generator
- `passthrough options` is dictionary of options to be passed through to the underlying base dataframe
- `supported options` is dictionary of options that are not supported
:param options: Dict of options to control generating of data
:returns: tuple of options dictionaries - (datagen_options, passthrough_options, unsupported options)
"""
passthrough_options = {}
unsupported_options = {}
datagen_options = {}

supported_options = [_STREAMING_SOURCE_OPTION,
_STREAMING_SCHEMA_OPTION,
_STREAMING_PATH_OPTION,
_STREAMING_TABLE_OPTION,
_STREAMING_ID_FIELD_OPTION,
_STREAMING_TIMESTAMP_FIELD_OPTION,
_STREAMING_GEN_TIMESTAMP_OPTION,
_STREAMING_USE_SOURCE_FIELDS
]

if options is not None:
for k, v in options.items():
if isinstance(k, str):
if k.startswith(_BUILD_OPTION_PREFIX):
if k in supported_options:
datagen_options[k] = v
else:
unsupported_options[k] = v
else:
passthrough_options[k] = v
else:
unsupported_options[k] = v

# add defaults

return datagen_options, passthrough_options, unsupported_options

def _applyStreamingDefaults(self, build_options, passthrough_options):
""" Apply default options for streaming data generation"""
assert build_options is not None
assert passthrough_options is not None

# default to `rate` streaming source
if _STREAMING_SOURCE_OPTION not in build_options:
build_options[_STREAMING_SOURCE_OPTION] = _STREAMING_SOURCE_RATE

# setup `numPartitions` if not specified
if build_options[_STREAMING_SOURCE_OPTION] in [_STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]:
if _STREAMING_SOURCE_NUM_PARTITIONS not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS] = self.partitions

# set up rows per batch if not specified
if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE:
if _STREAMING_SOURCE_ROWS_PER_SECOND not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_ROWS_PER_SECOND] = 1

if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE_MICRO_BATCH:
if _STREAMING_SOURCE_ROWS_PER_BATCH not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_ROWS_PER_BATCH] = 1
if _STREAM_SOURCE_START_TIMESTAMP not in passthrough_options:
currentTs = math.floor(time.mktime(time.localtime())) * 1000
passthrough_options[_STREAM_SOURCE_START_TIMESTAMP] = currentTs

if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_TEXT:
self.logger.warning("Use of the `text` format may not work due to lack of type support")

def build(self, withTempView=False, withView=False, withStreaming=False, options=None):
""" build the test data set from the column definitions and return a dataframe for it

Expand Down
119 changes: 119 additions & 0 deletions dbldatagen/enhanced_event_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file defines the `EnhancedEventTime` class

This defines helper methods for implementing enhanced event time
"""

import dbldatagen.distributions as dist

HASH_COMPUTE_METHOD = "hash"
VALUES_COMPUTE_METHOD = "values"
RAW_VALUES_COMPUTE_METHOD = "raw_values"
AUTO_COMPUTE_METHOD = "auto"
COMPUTE_METHOD_VALID_VALUES = [HASH_COMPUTE_METHOD,
AUTO_COMPUTE_METHOD,
VALUES_COMPUTE_METHOD,
RAW_VALUES_COMPUTE_METHOD]


class EnhancedEventTimeHelper(object):

def init(self):
pass

def withEnhancedEventTime(self,
dataspec,
startEventTime=None,
acceleratedEventTimeInterval="10 minutes",
fractionLateArriving=0.1,
lateTimeInterval="6 hours",
jitter=(-0.25, 0.9999),
eventTimeName=None,
baseColumn="timestamp",
keepIntermediateColumns=False):
"""
Implement enhanced event time

:param dataspec: dataspec to apply enhanced event time to
:param startEventTime: start timestamp of output event time
:param acceleratedEventTimeInterval: interval for accelerated event time (i.e "10 minutes")
:param fractionLateArriving: fraction of late arriving data. range [0.0, 1.0]
:param lateTimeInterval: interval for late arriving events (i.e "6 hours")
:param jitter: jitter factor to avoid strictly increasing order in events
:param eventTimeName: Column name for generated event time column
:param baseColumn: Base column name used for computations of adjusted event time
:param keepIntermediateColumns: Flag to retain intermediate columns in the output, [ debug / test only]

This adjusts the dataframe to produce IOT style event time that normally increases over time but has a
configurable fraction of late arriving data. It uses a base timestamp column (called `timestamp` by default)
to compute data. There must be a column of this name in the source data frame and it should have the value of
"now()".

By default `rate` and `rate-micro-batch` streaming sources have this column but
it can be added to other dataframes - either batch or streaming data frames.

While the overall intent is to support synthetic IOT style simulated device data in a stream, it can be used
with a batch data frame (as long as an appropriate timestamp column is added and designated as the base data
timestamp column.
"""

assert startEventTime is not None, "value for `startTime` must be specified"
assert dataspec is not None, "dataspec must be specified"
assert 0.0 <= fractionLateArriving <= 1.0, "fractionLateArriving must be in range [0.0, 1.0]"
assert eventTimeName is not None, "eventTimeName argument must be supplied"

# determine timestamp for start of generation
start_of_generation = \
dataspec.sparkSession.sql("select cast(now() as string) as start_timestamp").collect()[0][
'start_timestamp']

omitInterimColumns = not keepIntermediateColumns

retval = (dataspec
.withColumn("_late_arrival_factor1", "double", minValue=0.0, maxValue=1.0, continuous=True,
random=True,
distribution=dist.Beta(2.0, 5.3), omit=omitInterimColumns)
.withColumn("_late_arrival_factor", "double", expr="least(_late_arrival_factor1 * 1.17, 1.0)",
baseColumn="_late_arrival_factor1",
omit=omitInterimColumns)
.withColumn("_stream_time", "timestamp", expr=f"{baseColumn}",
omit=omitInterimColumns, baseColumn=baseColumn)
.withColumn("_data_gen_time", "timestamp", expr="now()", omit=omitInterimColumns)
.withColumn("_difference_factor", "double",
expr=f"cast(_stream_time as double) - cast(TIMESTAMP '{start_of_generation}' as double)",
baseColumns=["_stream_time"],
omit=omitInterimColumns)
.withColumn("_jitter_within_event_interval", "double", minValue=jitter[0], maxValue=jitter[1],
continuous=True, random=True,
omit=omitInterimColumns)
.withColumn("_late_arrival_prob", "double", minValue=0.0, maxValue=1.0, continuous=True,
random=True,
omit=omitInterimColumns)
.withColumn("_late_arrival", "boolean",
expr=f"case when _late_arrival_prob <= {fractionLateArriving} then true else false end",
baseColumns=["_late_arrival_prob"],
omit=omitInterimColumns)
.withColumn("_ontime_event_ts", "timestamp",
expr=f"""greatest(TIMESTAMP '{startEventTime}' +
((_difference_factor + _jitter_within_event_interval)
* INTERVAL {acceleratedEventTimeInterval}),
TIMESTAMP '{startEventTime}') """,
baseColumns=["_difference_factor", "_jitter_within_event_interval"],
omit=omitInterimColumns)
.withColumn("_late_arrival_ts", "timestamp",
expr=f"""greatest(_ontime_event_ts - (_late_arrival_factor * INTERVAL {lateTimeInterval}),
TIMESTAMP '{startEventTime}')
""",
baseColumns=["_ontime_event_ts", "_late_arrival_factor"],
omit=omitInterimColumns)

# generate event time column
.withColumn(eventTimeName, "timestamp",
expr="case when _late_arrival then _late_arrival_ts else _ontime_event_ts end",
baseColumns=["_late_arrival", "_late_arrival_ts", "_ontime_event_ts"])
)
return retval
2 changes: 1 addition & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ As it is installable via `%pip install`, it can also be incorporated in environm
Using data distributions <DISTRIBUTIONS>
Options for column specification <options_and_features>
Generating repeatable data <repeatable_data_generation>
Using streaming data <using_streaming_data>
Producing synthetic streaming data <using_streaming_data>
Generating JSON and structured column data <generating_json_data>
Generating synthetic data from existing data <generating_from_existing_data>
Generating Change Data Capture (CDC) data <generating_cdc_data>
Expand Down
Loading
Loading