diff --git a/deploy/logstash_ipstats_tpl.json b/deploy/logstash_ipstats_tpl.json index a9acd89..f8be3cb 100644 --- a/deploy/logstash_ipstats_tpl.json +++ b/deploy/logstash_ipstats_tpl.json @@ -5,9 +5,11 @@ }, "mappings": { "_doc": { - "dynamic": true, "properties": { - "ip": { + "src_ip": { + "type": "keyword" + }, + "dest_ip": { "type": "keyword" }, "flows": { diff --git a/deploy/spark-client.py b/deploy/spark-client.py index 01eab78..a1fb1c9 100644 --- a/deploy/spark-client.py +++ b/deploy/spark-client.py @@ -1,6 +1,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, window from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType +import pyspark.sql.functions as funcs """ 常量 @@ -59,18 +60,16 @@ new_df.printSchema() # 聚合 -net_df = new_df.withWatermark( +res_df = new_df.withWatermark( 'event_time', window_time ).groupBy( new_df.src_ip, new_df.dest_ip, window(new_df.event_time, window_time, window_time), -).sum('in_bytes', 'in_pkts') - -res_df = net_df.withColumnRenamed( - "sum(in_bytes)","in_bytes" -).withColumnRenamed( - "sum(in_pkts)","in_pkts" +).agg( + funcs.count("*").alias("flows"), + funcs.sum("in_bytes").alias("bytes"), + funcs.sum("in_pkts").alias("packets"), ) res_df.printSchema() diff --git a/deploy/test/sp_test2_2.py b/deploy/test/sp_test2_2.py new file mode 100644 index 0000000..a21e054 --- /dev/null +++ b/deploy/test/sp_test2_2.py @@ -0,0 +1,31 @@ +from pyspark import SparkConf +from pyspark.sql import SparkSession, Row +import pyspark.sql.functions as funcs + +conf = SparkConf() +conf.set("spark.master", "spark://192.168.33.50:7077") +conf.set('spark.driver.host', '192.168.33.50') + +spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() + +# 模拟数据发生 +lines = spark.createDataFrame([ + Row(ipv4_src_addr='122.204.161.240', dest="1", in_bytes=120, in_pkts=2), + Row(ipv4_src_addr='122.204.161.240', dest="2", in_bytes=100, in_pkts=3), + Row(ipv4_src_addr='122.204.161.240', dest="2", in_bytes=200, in_pkts=10), + Row(ipv4_src_addr='122.204.161.241', dest="1", in_bytes=150, in_pkts=7), + Row(ipv4_src_addr='122.204.161.241', dest="3", in_bytes=170, in_pkts=3), + Row(ipv4_src_addr='122.204.161.242', dest="3", in_bytes=220, in_pkts=5), +]) +lines.printSchema() + +group_df = lines.groupBy(lines.ipv4_src_addr, lines.dest).agg( + funcs.count("*").alias("count"), + funcs.sum("in_bytes").alias("bytes"), +) +group_df.show() +# d1 = group_df.count() +# d1.show() + +# d2 = group_df.sum('in_bytes', 'in_pkts').withColumnRenamed("sum(in_bytes)","in_bytes").withColumnRenamed("sum(in_pkts)","in_pkts") +# d2.show() diff --git a/deploy/test/sp_test6_7.py b/deploy/test/sp_test6_7.py index 4a167a5..0a973e7 100644 --- a/deploy/test/sp_test6_7.py +++ b/deploy/test/sp_test6_7.py @@ -8,6 +8,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, window from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType +import pyspark.sql.functions as funcs """ 常量 @@ -66,18 +67,16 @@ new_df.printSchema() # 聚合 -net_df = new_df.withWatermark( +res_df = new_df.withWatermark( 'create_time', window_time ).groupBy( new_df.src_ip, new_df.dest_ip, window(new_df.create_time, window_time, window_time), -).sum('in_bytes', 'in_pkts') - -res_df = net_df.withColumnRenamed( - "sum(in_bytes)","in_bytes" -).withColumnRenamed( - "sum(in_pkts)","in_pkts" +).agg( + funcs.count("*").alias("flows"), + funcs.sum("in_bytes").alias("bytes"), + funcs.sum("in_pkts").alias("packets"), ) res_df.printSchema()