Skip to content

Commit

Permalink
架构优化 按srcip和destip分组
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexiao committed Sep 17, 2019
1 parent 3036264 commit c08cf0d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
4 changes: 3 additions & 1 deletion deploy/spark-client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
new_stream_data.printSchema()

new_df = new_stream_data.filter(
new_stream_data.json_data.netflow.protocol == 6
(new_stream_data.json_data.netflow.protocol == 6) | (new_stream_data.json_data.netflow.protocol == 17)
).select(
(new_stream_data.json_data.netflow.ipv4_src_addr).alias('src_ip'),
(new_stream_data.json_data.netflow.ipv4_dst_addr).alias('dest_ip'),
(new_stream_data.json_data.netflow.in_bytes).alias('in_bytes'),
(new_stream_data.json_data.netflow.in_pkts).alias('in_pkts'),
(new_stream_data.json_data.event_time).alias('event_time'),
Expand All @@ -62,6 +63,7 @@
'event_time', window_time
).groupBy(
new_df.src_ip,
new_df.dest_ip,
window(new_df.event_time, window_time, window_time),
).sum('in_bytes', 'in_pkts')

Expand Down
2 changes: 1 addition & 1 deletion deploy/test/kf_producer4.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"ipv4_src_addr": "122.204.161.{}".format(random.randint(240, 242)),
"in_bytes": random.randint(10, 500),
"in_pkts": random.randint(1, 10),
"protocol": random.randint(6, 7),
"protocol": random.randint(6, 8),
"ipv4_dst_addr": "192.168.71.{}".format(random.randint(180, 182)),
},
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
Expand Down
28 changes: 19 additions & 9 deletions deploy/test/sp_test6_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,24 @@
from pyspark.sql.functions import from_json, window
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType

"""
常量
"""
spark_master = "spark://192.168.33.50:7077"
kafka_master = "192.168.33.50:9092"
mnet_topic = "test"
mnet_agg_topic = "testres"
window_time = "30 seconds"

spark = SparkSession.builder.master(
"spark://192.168.33.50:7077"
spark_master
).getOrCreate()

stream_data = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.33.50:9092") \
.option("subscribe", "test") \
.option("kafka.bootstrap.servers", kafka_master) \
.option("subscribe", mnet_topic) \
.load()
stream_data.printSchema()

Expand Down Expand Up @@ -47,9 +55,10 @@
new_stream_data.printSchema()

new_df = new_stream_data.filter(
new_stream_data.json_data.netflow.protocol == 6
(new_stream_data.json_data.netflow.protocol == 6) | (new_stream_data.json_data.netflow.protocol == 8)
).select(
(new_stream_data.json_data.netflow.ipv4_src_addr).alias('src_ip'),
(new_stream_data.json_data.netflow.ipv4_dst_addr).alias('dest_ip'),
(new_stream_data.json_data.netflow.in_bytes).alias('in_bytes'),
(new_stream_data.json_data.netflow.in_pkts).alias('in_pkts'),
(new_stream_data.json_data.create_time).alias('create_time'),
Expand All @@ -58,10 +67,11 @@

# 聚合
net_df = new_df.withWatermark(
'create_time', '30 seconds'
'create_time', window_time
).groupBy(
new_df.src_ip,
window(new_df.create_time, '30 seconds', '30 seconds'),
new_df.dest_ip,
window(new_df.create_time, window_time, window_time),
).sum('in_bytes', 'in_pkts')

res_df = net_df.withColumnRenamed(
Expand All @@ -75,11 +85,11 @@
query = res_df \
.selectExpr("CAST(window AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.trigger(processingTime='30 seconds') \
.trigger(processingTime=window_time) \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.33.50:9092") \
.option("topic", "testres") \
.option("checkpointLocation", "/tmp/testres") \
.option("topic", mnet_agg_topic) \
.option("checkpointLocation", "/tmp/{}".format(mnet_agg_topic)) \
.start() \
.awaitTermination()

0 comments on commit c08cf0d

Please sign in to comment.