Skip to content

Commit

Permalink
架构优化 按srcip和destip分组
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexiao committed Sep 17, 2019
1 parent c08cf0d commit 93b3028
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 16 deletions.
6 changes: 4 additions & 2 deletions deploy/logstash_ipstats_tpl.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
},
"mappings": {
"_doc": {
"dynamic": true,
"properties": {
"ip": {
"src_ip": {
"type": "keyword"
},
"dest_ip": {
"type": "keyword"
},
"flows": {
Expand Down
13 changes: 6 additions & 7 deletions deploy/spark-client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, window
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType
import pyspark.sql.functions as funcs

"""
常量
Expand Down Expand Up @@ -59,18 +60,16 @@
new_df.printSchema()

# 聚合
net_df = new_df.withWatermark(
res_df = new_df.withWatermark(
'event_time', window_time
).groupBy(
new_df.src_ip,
new_df.dest_ip,
window(new_df.event_time, window_time, window_time),
).sum('in_bytes', 'in_pkts')

res_df = net_df.withColumnRenamed(
"sum(in_bytes)","in_bytes"
).withColumnRenamed(
"sum(in_pkts)","in_pkts"
).agg(
funcs.count("*").alias("flows"),
funcs.sum("in_bytes").alias("bytes"),
funcs.sum("in_pkts").alias("packets"),
)
res_df.printSchema()

Expand Down
31 changes: 31 additions & 0 deletions deploy/test/sp_test2_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pyspark import SparkConf
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as funcs

conf = SparkConf()
conf.set("spark.master", "spark://192.168.33.50:7077")
conf.set('spark.driver.host', '192.168.33.50')

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

# 模拟数据发生
lines = spark.createDataFrame([
Row(ipv4_src_addr='122.204.161.240', dest="1", in_bytes=120, in_pkts=2),
Row(ipv4_src_addr='122.204.161.240', dest="2", in_bytes=100, in_pkts=3),
Row(ipv4_src_addr='122.204.161.240', dest="2", in_bytes=200, in_pkts=10),
Row(ipv4_src_addr='122.204.161.241', dest="1", in_bytes=150, in_pkts=7),
Row(ipv4_src_addr='122.204.161.241', dest="3", in_bytes=170, in_pkts=3),
Row(ipv4_src_addr='122.204.161.242', dest="3", in_bytes=220, in_pkts=5),
])
lines.printSchema()

group_df = lines.groupBy(lines.ipv4_src_addr, lines.dest).agg(
funcs.count("*").alias("count"),
funcs.sum("in_bytes").alias("bytes"),
)
group_df.show()
# d1 = group_df.count()
# d1.show()

# d2 = group_df.sum('in_bytes', 'in_pkts').withColumnRenamed("sum(in_bytes)","in_bytes").withColumnRenamed("sum(in_pkts)","in_pkts")
# d2.show()
13 changes: 6 additions & 7 deletions deploy/test/sp_test6_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, window
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType
import pyspark.sql.functions as funcs

"""
常量
Expand Down Expand Up @@ -66,18 +67,16 @@
new_df.printSchema()

# 聚合
net_df = new_df.withWatermark(
res_df = new_df.withWatermark(
'create_time', window_time
).groupBy(
new_df.src_ip,
new_df.dest_ip,
window(new_df.create_time, window_time, window_time),
).sum('in_bytes', 'in_pkts')

res_df = net_df.withColumnRenamed(
"sum(in_bytes)","in_bytes"
).withColumnRenamed(
"sum(in_pkts)","in_pkts"
).agg(
funcs.count("*").alias("flows"),
funcs.sum("in_bytes").alias("bytes"),
funcs.sum("in_pkts").alias("packets"),
)
res_df.printSchema()

Expand Down

0 comments on commit 93b3028

Please sign in to comment.