Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata
- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then

```bash
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend standardizing the data file directory to testdata/tpch and add the correct make_file.py command just above, for example

Suggested change
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=../testdata/tpch --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2

add before this more documentation one make_file

  • In the tpch directory, use make_data.py to create a TPCH dataset at a provided scale factor and an output director, such as the testdata directory
python make_data.py 1 "../testdata/tpch"

could also specify a env variable for this in the setup
TPCH_DATA=../testdata/tpch

and replace the examples with $TPCH_DATA

```

To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create.

For example, to execute the following query:

```bash
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1'
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1'
```

To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-processor`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-processor=4`, then `40` `RayStage` Actors will be created. If `partitions-per-processor=16` or is absent, then `10` `RayStage` Actors will be created.
Expand Down
40 changes: 30 additions & 10 deletions tpch/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def tpch_query(qnum: int) -> str:


def main(
qnum: int,
queries: list[(str, str)],
data_path: str,
concurrency: int,
batch_size: int,
Expand Down Expand Up @@ -99,10 +99,7 @@ def main(
if validate:
results["validated"] = {}

queries = range(1, 23) if qnum == -1 else [qnum]
for qnum in queries:
sql = tpch_query(qnum)

for (qid, sql) in queries:
statements = list(
filter(lambda x: len(x) > 0, map(lambda x: x.strip(), sql.split(";")))
)
Expand All @@ -115,7 +112,7 @@ def main(
df = ctx.sql(sql)
all_batches.append(df.collect())
end_time = time.time()
results["queries"][qnum] = end_time - start_time
results["queries"][qid] = end_time - start_time

calculated = "\n".join([prettify(b) for b in all_batches])
print(calculated)
Expand All @@ -125,8 +122,8 @@ def main(
all_batches.append(local.collect_sql(sql))
expected = "\n".join([prettify(b) for b in all_batches])

results["validated"][qnum] = calculated == expected
print(f"done with query {qnum}")
results["validated"][qid] = calculated == expected
print(f"done with query {qid}")

# write the results as we go, so you can peek at them
results_dump = json.dumps(results, indent=4)
Expand Down Expand Up @@ -154,7 +151,10 @@ def main(
parser.add_argument(
"--concurrency", required=True, help="Number of concurrent tasks"
)
parser.add_argument("--qnum", type=int, default=-1, help="TPCH query number, 1-22")
parser.add_argument("--qnum", type=int, default=-1,
help="TPCH query number, 1-22")
parser.add_argument("--query", required=False, type=str,
help="Custom query to run with tpch tables")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="Custom query to run with tpch tables")
help="Custom SQL query statement to run with tpch tables")

parser.add_argument("--listing-tables", action="store_true")
parser.add_argument("--validate", action="store_true")
parser.add_argument(
Expand Down Expand Up @@ -186,8 +186,28 @@ def main(

args = parser.parse_args()

if (args.qnum != -1 and args.query is not None):
print("Please specify either --qnum or --query, but not both")
exit(1)

queries = []
if (args.qnum != -1):
if args.qnum < 1 or args.qnum > 22:
print("Invalid query number. Please specify a number between 1 and 22.")
exit(1)
else:
queries.append((str(args.qnum), tpch_query(args.qnum)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly mention TPCH in the id

Suggested change
queries.append((str(args.qnum), tpch_query(args.qnum)))
queries.append((f"TPCH-{args.qnum)}", tpch_query(args.qnum)))

print("Executing tpch query ", args.qnum)

elif (args.query is not None):
queries.append(("custom query", args.query))
print("Executing custom query: ", args.query)
else:
print("Executing all tpch queries")
queries = [(str(i), tpch_query(i)) for i in range(1, 23)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queries = [(str(i), tpch_query(i)) for i in range(1, 23)]
queries = [(f"TPCH-{i}", tpch_query(i)) for i in range(1, 23)]


Comment on lines +189 to +208

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor suggestion, extract this into its own functions, for example

from typing import List
def get_sql_queries(tpch_qnum: str = None, sql_statement: str= None) -> List[(str, str)]:
    """
    Get the list of SQL statements from either the TPCH or user provided SQL statements.
    At most one of these parameters can be provided.

    :param tpch_qnum: the TPCH Query number. If none, return all TPCH queries supported
    :param sql_statement: SQL string statement on available data tables (e.g ingested through make_data.py)
    :return: a list of tuples with name of the Query and the string SQL statement
    """

main(
args.qnum,
queries,
args.data,
int(args.concurrency),
int(args.batch_size),
Expand Down