-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathskewed_synthetic_workload.py
46 lines (37 loc) · 1.37 KB
/
skewed_synthetic_workload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from __future__ import print_function
import os
os.environ['SPARK_HOME'] = '~/spark'
import sys
from pyspark.sql import SparkSession
import time
import numpy as np
from datetime import datetime
import random
import string
if __name__ == '__main__':
# Create Spark Context
spark = SparkSession.builder.appName('synthetic_workload').getOrCreate()
sc = spark.sparkContext
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
size = float(sys.argv[2]) if len(sys.argv) > 0 else 1
skewness = float(sys.argv[3]) if len(sys.argv) > 0 else 0.1
#n = 10000000 # 1 GB dataset
#n = 100000000 # 10 GB dataset
n = int(10000000 * size)
N=100
data = sc.parallelize(range(1 + partitions, n + 1 + partitions), partitions).\
map(lambda x: (1 if x < skewness*n else x % partitions, (''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N)))))
#res = data.join(data).\ -> result:row_of_lines^2
# count() # map(lambda x: (1)).\
#res = data.count()
res = data.groupByKey().\
mapValues(lambda x: len(x)).\
collect()
#print("\n")
print("Result:", res)
#for res in data:
# print(res, "\n")
#print(res[0],"\t", res[1])
print("\n")
# Stop the session
spark.stop()