-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaker_s7.py
70 lines (57 loc) · 2.94 KB
/
maker_s7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from maker import *
from protocol.s7_protocol import *
from setting import *
def make_basic_s7():
tpkt = TPKT()
cotp = COTP()
s7 = S7()
return tpkt, cotp, s7
def make_s7_read_packets(dmac, smac, dip, sip, dport, sport, address, data):
s_ethernet, s_ipv4, s_tcp, s_udp = make_basic_protocol(dmac, smac, dip, sip, dport, sport, 0)
s_tpkt, s_cotp, s_s7 = make_basic_s7()
s_cotp.to_function()
s_s7.to_ack_read(b'\x00\x00', b'\x02', address)
s_tpkt.set_length(s_cotp.bit_lens, s_s7.bit_lens)
s_ipv4.set_total_length(s_tcp.bit_lens, s_tpkt.bit_lens, s_cotp.bit_lens, s_s7.bit_lens)
s_tcp.set_connection_status(0, s_tpkt.bit_lens, s_cotp.bit_lens, s_s7.bit_lens)
d_ethernet, d_ipv4, d_tcp, d_udp = make_basic_protocol(smac, dmac, sip, dip, sport, dport, 1)
d_tpkt, d_cotp, d_s7 = make_basic_s7()
d_cotp.to_function()
d_s7.to_res_read(b'\x04', data, b'\x00\x20', 4)
d_tpkt.set_length(d_cotp.bit_lens, d_s7.bit_lens)
d_ipv4.set_total_length(d_tcp.bit_lens, d_tpkt.bit_lens, d_cotp.bit_lens, d_s7.bit_lens)
d_tcp.set_connection_status(1, d_tpkt.bit_lens, d_cotp.bit_lens, d_s7.bit_lens)
s_pkt = pile_up(s_ethernet, s_ipv4, s_tcp, s_tpkt, s_cotp, s_s7)
d_pkt = pile_up(d_ethernet, d_ipv4, d_tcp, d_tpkt, d_cotp, d_s7)
return s_pkt, d_pkt
def make_s7_write_packets(dmac, smac, dip, sip, dport, sport, address, data):
s_ethernet, s_ipv4, s_tcp, s_udp = make_basic_protocol(dmac, smac, dip, sip, dport, sport, 0)
s_tpkt, s_cotp, s_s7 = make_basic_s7()
s_cotp.to_function()
s_s7.to_ack_write(b'\x04', data, 4, address)
s_tpkt.set_length(s_cotp.bit_lens, s_s7.bit_lens)
s_ipv4.set_total_length(s_tcp.bit_lens, s_tpkt.bit_lens, s_cotp.bit_lens, s_s7.bit_lens)
s_tcp.set_connection_status(0, s_tpkt.bit_lens, s_cotp.bit_lens, s_s7.bit_lens)
d_ethernet, d_ipv4, d_tcp, d_udp = make_basic_protocol(smac, dmac, sip, dip, sport, dport, 1)
d_tpkt, d_cotp, d_s7 = make_basic_s7()
d_cotp.to_function()
d_s7.to_res_write(1)
d_tpkt.set_length(d_cotp.bit_lens, d_s7.bit_lens)
d_ipv4.set_total_length(d_tcp.bit_lens, d_tpkt.bit_lens, d_cotp.bit_lens, d_s7.bit_lens)
d_tcp.set_connection_status(1, d_tpkt.bit_lens, d_cotp.bit_lens, d_s7.bit_lens)
s_pkt = pile_up(s_ethernet, s_ipv4, s_tcp, s_tpkt, s_cotp, s_s7)
d_pkt = pile_up(d_ethernet, d_ipv4, d_tcp, d_tpkt, d_cotp, d_s7)
return s_pkt, d_pkt
if __name__ == '__main__':
# spr, dpr = make_s7_read_packets(SMAC, DMAC, SIP, DIP, 102, 4185, b'\x05', b'\x00\x01\x00\x02')
spw, dpw = make_s7_write_packets(SMAC, DMAC, SIP, DIP, 102, 4185, b'\x05', b'\x00\x01\x00\x04')
pkts = []
for i in range(1, 50):
if i == 35:
data = int_to_bytes(100, 4)
else:
data = int_to_bytes(i, 4)
spr, dpr = make_s7_read_packets(SMAC, DMAC, SIP, DIP, 102, 4185, b'\x05', data)
pkts.append(spr)
pkts.append(dpr)
pcap_wrapper([spw, dpw] + pkts, 'out/s7_test.pcap')