- 
                Notifications
    You must be signed in to change notification settings 
- Fork 133
Add examples from TPC-H #666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Merged
      
      
    
  
     Merged
                    Changes from all commits
      Commits
    
    
            Show all changes
          
          
            30 commits
          
        
        Select commit
          Hold shift + click to select a range
      
      27c60d9
              
                Update location of docker image
              
              
                timsaucer ebf3ae9
              
                Initial commit for queries 1-3
              
              
                timsaucer 83138ea
              
                Commit queries 4-7 of TPC-H in examples
              
              
                timsaucer 1a10c17
              
                Add required license text
              
              
                timsaucer 15aa893
              
                Add additional text around why to use a case statement in the example
              
              
                timsaucer 0060ae7
              
                add market share example
              
              
                timsaucer 8fb4aa7
              
                Add example for product type profit measure
              
              
                timsaucer 9ca5136
              
                Inital commit returned item report
              
              
                timsaucer 99008bc
              
                Linting
              
              
                timsaucer 5a04aee
              
                Initial commit of q11 example
              
              
                timsaucer c26c524
              
                Initial commit of q12 from tpc-h
              
              
                timsaucer 11ffced
              
                Initial commit for customer distribution example
              
              
                timsaucer edb848b
              
                Initial commit of promotion effect example
              
              
                timsaucer 0762336
              
                Initial commit of q15 in tph-c, top supplier
              
              
                timsaucer 883a61b
              
                Initial commit of q16 in tph-c, part supplier relationship
              
              
                timsaucer 396029e
              
                Initial commit of q17 in tph-c, small quatity order
              
              
                timsaucer 6835f56
              
                Initial commit of q18 in tph-c, large volume customer
              
              
                timsaucer c9ca2ba
              
                Initial commit of q19 in tph-c, discounted revenue
              
              
                timsaucer 7a20e39
              
                Initial commit of q20 in tph-c, potential part promotion
              
              
                timsaucer 7497829
              
                Initial commit of q21 in tph-c, supplier who kept order waiting
              
              
                timsaucer 33ecff3
              
                Initial commit of q22 in tph-c, global sales opportunity
              
              
                timsaucer 99aac68
              
                Adding readme information and marking text as copyrighted
              
              
                timsaucer ebbe69f
              
                Minimum part cost must be identified per part not across all parts th…
              
              
                timsaucer 487b62e
              
                Change ordering of output rows to match spec
              
              
                timsaucer 61e027f
              
                Set parameter to match spec
              
              
                timsaucer acffc87
              
                Set parameter to match spec
              
              
                timsaucer fc9f845
              
                setting values to match spec
              
              
                timsaucer 0b90a66
              
                Linting
              
              
                timsaucer 36cc531
              
                Expand on readme to link to examples within tpch folder
              
              
                timsaucer 59327df
              
                Minor typo
              
              
                timsaucer File filter
Filter by extension
Conversations
          Failed to load comments.   
        
        
          
      Loading
        
  Jump to
        
          Jump to file
        
      
      
          Failed to load files.   
        
        
          
      Loading
        
  Diff view
Diff view
There are no files selected for viewing
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| data | ||
|  | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| <!--- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|  | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|  | ||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|  | ||
| # DataFusion Python Examples for TPC-H | ||
|  | ||
| These examples reproduce the problems listed in the Transaction Process Council | ||
| TPC-H benchmark. The purpose of these examples is to demonstrate how to use | ||
| different aspects of Data Fusion and not necessarily geared towards creating the | ||
| most performant queries possible. Within each example is a description of the | ||
| problem. For users who are familiar with SQL style commands, you can compare the | ||
| approaches in these examples with those listed in the specification. | ||
|  | ||
| - https://www.tpc.org/tpch/ | ||
|  | ||
| The examples provided are based on version 2.18.0 of the TPC-H specification. | ||
|  | ||
| ## Data Setup | ||
|  | ||
| To run these examples, you must first generate a dataset. The `dbgen` tool | ||
| provided by TPC can create datasets of arbitrary scale. For testing it is | ||
| typically sufficient to create a 1 gigabyte dataset. For convenience, this | ||
| repository has a script which uses docker to create this dataset. From the | ||
| `benchmarks/tpch` directory execute the following script. | ||
|  | ||
| ```bash | ||
| ./tpch-gen.sh 1 | ||
| ``` | ||
|  | ||
| The examples provided use parquet files for the tables generated by `dbgen`. | ||
| A python script is provided to convert the text files from `dbgen` into parquet | ||
| files expected by the examples. From the `examples/tpch` directory you can | ||
| execute the following command to create the necessary parquet files. | ||
|  | ||
| ```bash | ||
| python convert_data_to_parquet.py | ||
| ``` | ||
|  | ||
| ## Description of Examples | ||
|  | ||
| For easier access, a description of the techniques demonstrated in each file | ||
| is in the README.md file in the `examples` directory. | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|  | ||
| """ | ||
| This is a utility function that will consumer the data generated by dbgen from TPC-H and convert | ||
| it into a parquet file with the column names as expected by the TPC-H specification. It assumes | ||
| the data generated resides in a path ../../benchmarks/tpch/data relative to the current file, | ||
| as will be generated by the script provided in this repository. | ||
| """ | ||
|  | ||
| import os | ||
| import pyarrow | ||
| import datafusion | ||
|  | ||
| ctx = datafusion.SessionContext() | ||
|  | ||
| all_schemas = {} | ||
|  | ||
| all_schemas["customer"] = [ | ||
| ("C_CUSTKEY", pyarrow.int32()), | ||
| ("C_NAME", pyarrow.string()), | ||
| ("C_ADDRESS", pyarrow.string()), | ||
| ("C_NATIONKEY", pyarrow.int32()), | ||
| ("C_PHONE", pyarrow.string()), | ||
| ("C_ACCTBAL", pyarrow.float32()), | ||
| ("C_MKTSEGMENT", pyarrow.string()), | ||
| ("C_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["lineitem"] = [ | ||
| ("L_ORDERKEY", pyarrow.int32()), | ||
| ("L_PARTKEY", pyarrow.int32()), | ||
| ("L_SUPPKEY", pyarrow.int32()), | ||
| ("L_LINENUMBER", pyarrow.int32()), | ||
| ("L_QUANTITY", pyarrow.float32()), | ||
| ("L_EXTENDEDPRICE", pyarrow.float32()), | ||
| ("L_DISCOUNT", pyarrow.float32()), | ||
| ("L_TAX", pyarrow.float32()), | ||
| ("L_RETURNFLAG", pyarrow.string()), | ||
| ("L_LINESTATUS", pyarrow.string()), | ||
| ("L_SHIPDATE", pyarrow.date32()), | ||
| ("L_COMMITDATE", pyarrow.date32()), | ||
| ("L_RECEIPTDATE", pyarrow.date32()), | ||
| ("L_SHIPINSTRUCT", pyarrow.string()), | ||
| ("L_SHIPMODE", pyarrow.string()), | ||
| ("L_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["nation"] = [ | ||
| ("N_NATIONKEY", pyarrow.int32()), | ||
| ("N_NAME", pyarrow.string()), | ||
| ("N_REGIONKEY", pyarrow.int32()), | ||
| ("N_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["orders"] = [ | ||
| ("O_ORDERKEY", pyarrow.int32()), | ||
| ("O_CUSTKEY", pyarrow.int32()), | ||
| ("O_ORDERSTATUS", pyarrow.string()), | ||
| ("O_TOTALPRICE", pyarrow.float32()), | ||
| ("O_ORDERDATE", pyarrow.date32()), | ||
| ("O_ORDERPRIORITY", pyarrow.string()), | ||
| ("O_CLERK", pyarrow.string()), | ||
| ("O_SHIPPRIORITY", pyarrow.int32()), | ||
| ("O_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["part"] = [ | ||
| ("P_PARTKEY", pyarrow.int32()), | ||
| ("P_NAME", pyarrow.string()), | ||
| ("P_MFGR", pyarrow.string()), | ||
| ("P_BRAND", pyarrow.string()), | ||
| ("P_TYPE", pyarrow.string()), | ||
| ("P_SIZE", pyarrow.int32()), | ||
| ("P_CONTAINER", pyarrow.string()), | ||
| ("P_RETAILPRICE", pyarrow.float32()), | ||
| ("P_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["partsupp"] = [ | ||
| ("PS_PARTKEY", pyarrow.int32()), | ||
| ("PS_SUPPKEY", pyarrow.int32()), | ||
| ("PS_AVAILQTY", pyarrow.int32()), | ||
| ("PS_SUPPLYCOST", pyarrow.float32()), | ||
| ("PS_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["region"] = [ | ||
| ("r_REGIONKEY", pyarrow.int32()), | ||
| ("r_NAME", pyarrow.string()), | ||
| ("r_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| all_schemas["supplier"] = [ | ||
| ("S_SUPPKEY", pyarrow.int32()), | ||
| ("S_NAME", pyarrow.string()), | ||
| ("S_ADDRESS", pyarrow.string()), | ||
| ("S_NATIONKEY", pyarrow.int32()), | ||
| ("S_PHONE", pyarrow.string()), | ||
| ("S_ACCTBAL", pyarrow.float32()), | ||
| ("S_COMMENT", pyarrow.string()), | ||
| ] | ||
|  | ||
| curr_dir = os.path.dirname(os.path.abspath(__file__)) | ||
| for filename, curr_schema in all_schemas.items(): | ||
|  | ||
| # For convenience, go ahead and convert the schema column names to lowercase | ||
| curr_schema = [(s[0].lower(), s[1]) for s in curr_schema] | ||
|  | ||
| # Pre-collect the output columns so we can ignore the null field we add | ||
| # in to handle the trailing | in the file | ||
| output_cols = [r[0] for r in curr_schema] | ||
|  | ||
| # Trailing | requires extra field for in processing | ||
| curr_schema.append(("some_null", pyarrow.null())) | ||
|  | ||
| schema = pyarrow.schema(curr_schema) | ||
|  | ||
| source_file = os.path.abspath( | ||
| os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv") | ||
| ) | ||
| dest_file = os.path.abspath(os.path.join(curr_dir, f"./data/{filename}.parquet")) | ||
|  | ||
| df = ctx.read_csv(source_file, schema=schema, has_header=False, delimiter="|") | ||
|  | ||
| df = df.select_columns(*output_cols) | ||
|  | ||
| df.write_parquet(dest_file, compression="snappy") | ||
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|  | ||
| """ | ||
| TPC-H Problem Statement Query 1: | ||
|  | ||
| The Pricing Summary Report Query provides a summary pricing report for all lineitems shipped as of | ||
| a given date. The date is within 60 - 120 days of the greatest ship date contained in the database. | ||
| The query lists totals for extended price, discounted extended price, discounted extended price | ||
| plus tax, average quantity, average extended price, and average discount. These aggregates are | ||
| grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS. | ||
| A count of the number of lineitems in each group is included. | ||
|  | ||
| The above problem statement text is copyrighted by the Transaction Processing Performance Council | ||
| as part of their TPC Benchmark H Specification revision 2.18.0. | ||
| """ | ||
|  | ||
| import pyarrow as pa | ||
| from datafusion import SessionContext, col, lit, functions as F | ||
|  | ||
| ctx = SessionContext() | ||
|  | ||
| df = ctx.read_parquet("data/lineitem.parquet") | ||
|  | ||
| # It may be that the date can be hard coded, based on examples shown. | ||
| # This approach will work with any date range in the provided data set. | ||
|  | ||
| greatest_ship_date = df.aggregate( | ||
| [], [F.max(col("l_shipdate")).alias("shipdate")] | ||
| ).collect()[0]["shipdate"][0] | ||
|  | ||
| # From the given problem, this is how close to the last date in the database we | ||
| # want to report results for. It should be between 60-120 days before the end. | ||
| DAYS_BEFORE_FINAL = 68 | ||
|  | ||
| # Note: this is a hack on setting the values. It should be set differently once | ||
| # https://github.com/apache/datafusion-python/issues/665 is resolved. | ||
|         
                  timsaucer marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), type=pa.month_day_nano_interval()) | ||
|  | ||
| print("Final date in database:", greatest_ship_date) | ||
|  | ||
| # Filter data to the dates of interest | ||
| df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval)) | ||
|  | ||
| # Aggregate the results | ||
|  | ||
| df = df.aggregate( | ||
| [col("l_returnflag"), col("l_linestatus")], | ||
| [ | ||
| F.sum(col("l_quantity")).alias("sum_qty"), | ||
| F.sum(col("l_extendedprice")).alias("sum_base_price"), | ||
| F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias( | ||
| "sum_disc_price" | ||
| ), | ||
| F.sum( | ||
| col("l_extendedprice") | ||
| * (lit(1.0) - col("l_discount")) | ||
| * (lit(1.0) + col("l_tax")) | ||
| ).alias("sum_charge"), | ||
| F.avg(col("l_quantity")).alias("avg_qty"), | ||
| F.avg(col("l_extendedprice")).alias("avg_price"), | ||
| F.avg(col("l_discount")).alias("avg_disc"), | ||
| F.count(col("l_returnflag")).alias( | ||
| "count_order" | ||
| ), # Counting any column should return same result | ||
| ], | ||
| ) | ||
|  | ||
| # Sort per the expected result | ||
|  | ||
| df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort()) | ||
|  | ||
| # Note: There appears to be a discrepancy between what is returned here and what is in the generated | ||
| # answers file for the case of return flag N and line status O, but I did not investigate further. | ||
|  | ||
| df.show() | ||
      
      Oops, something went wrong.
        
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not important for example usage, but the numeric fields should be decimal not float