-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexploit.py
406 lines (335 loc) · 17.1 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
from frmdp import FRMDP
from util import timed
from matplotlib import pyplot as plt
from scipy.stats import halfnorm
import numpy as np
import pickle
from itertools import product
from os.path import isfile
from collections import Counter
from sys import stdout
from os import system
class ExploitSelectionWithUsage(FRMDP):
"""
MDP to calculate the optimal policy for what exploit to use in a given state.
"""
def __init__(self, opponent_count=2, exploit_count=2, horizon=10, dtype=np.int8, usage=1, nop = False):
"""
Initialize the values for the exploit selection MDP for a given service.
:param int opponent_count: number of opposing teams. This is equal to the maximum\
reward for a given tick.
:param int exploit_count: number of exploit options for the service.
:param int horizon: how long the ctf goes on.
:param np.type dtype: Type of data to use for representing the states. Default is int8\
which limits the number of opponents and horizon to 127. This is a fairly resonable\
restriction and saves memory, which is why it is implemented this way.
"""
# get max value for this datatype. this will be used to ensure valid parameters.
max_val = np.iinfo(dtype).max
if opponent_count > max_val or horizon > max_val:
raise ValueError('Number of opponents and horizon must be less than or equal to %s.' % max_val)
# create string representation of actions. This is just for user interfacing.
actions = ['exploit' + str(i) for i in range(exploit_count)]
if nop:
actions += ['nop']
print(actions)
# STATES equals the reward reaped from each exploit. This is an effective way to
# circumnavigate having to do a stochastic game to represent muiltiple entities,
# since we don't really care what their individual state is, only our total reward.
# get all possible opponent, horizon tuples.
all_tups = product(range(opponent_count), range(horizon))
# print(f'Number of tuples is {len(list(all_tups))}')
# get all possible combinations of all tups for tuple of length exploit_count.
all_states = product(all_tups, repeat=exploit_count)
# Make into numpy arrays for efficiency.
states = [np.array(item) for item in all_states]
# Only consider those states with possible usage amounts. That is, state where the
# usage doesn't exceed the length of the game.
# states = [state for state in states if np.sum(state[:,1]) <= horizon]
print(f'Number of States is {len(states)}.')
# Generate terminal and initial state.
self.terminal = np.zeros(shape=exploit_count, dtype=dtype)
self.initial = np.full(shape=(exploit_count,2), fill_value=(opponent_count, 0), dtype=dtype)
# No termination reward - doesn't matter at what point you end.
termination = np.zeros(shape=len(states), dtype=dtype)
# Get the transition probability P(s' | s, a) for each s, a, s'.
super().__init__(states, actions, termination, None, horizon)
# self.GET_SUBSTATES(len(states), exploit_count)
self.t = self.generate_transition(states, actions, opponent_count, dtype,
horizon=horizon, usage_weight=usage)
def get_reward(self, state, action, state_reward):
"""
Gets reward for transition from state->action->nextState.
:param int state: The current state id.
:param int action: The current action id.
:param ndarray state_reward: The vector of rewards from the previous iteration of this state, action pair.
:return vector of rewards for each next_state.
"""
try:
reward = self.get_rewards_from_state(state)[action] + state_reward
except:
reward = 0 + state_reward
return reward
def is_terminal(self, state):
"""
Checks if this state is a terminal state.
:param int state: The index of the state to check.
:return True if terminal, False otherwise.
"""
return np.array_equal(self.terminal, self.get_rewards_from_state(state))
# Utilities ###########################################################
def get_rewards_from_state(self, s):
"""
Retrieves the rewards encoded in the state.
:param int s: state index.
:return np.array array w/ reward for each state.
"""
return self.s[s][:,0].copy()
def get_usages_from_state(self, s):
"""
Retrives ordered array of number of times an exploit has been used.
:param int s: state index.
:return np.array w/ usage count of each exploit.`
"""
return self.s[s][:,1].copy()
def is_successor(self, s1, s2, action):
"""
Checks if state2 is a POSSIBLE successor to state1. Possible meaning that
the exploit usage is correct.
:param int s1: index of source state.
:param int s2: index of dest state.
:param int action: action index.
:return boolean indicating if successor or not.
"""
rewards1 = self.get_rewards_from_state(s1)
rewards2 = self.get_rewards_from_state(s2)
for item in range(len(self.a)):
if rewards1[item] > rewards2[item]:
return False
return True
"""
rewards1 = self.get_usages_from_state(s1)
rewards2 = self.get_usages_from_state(s2)
if not self.a[action] == 'nop':
rewards1[action] += 1 # add one usage to this
return np.array_equal(rewards1, rewards2)
"""
def state_is_less(self, s1, s2):
"""
Checks if all the rewards of state 2 are less than or equal to those of
state 1.
:param int s1: index of source state.
:param int s2: index of dest state.
:return boolean indicating if s2 <= s1.
"""
return np.all(
np.less_equal(self.get_rewards_from_state(s2), self.get_rewards_from_state(s1))
)
@timed
def generate_transition(self, states, actions, opponent_count, dist_type=np.uint16, transition_type=np.float_,
shrink_bias=4, max_z_score=5, usage_weight=1, horizon=5):
"""
Create numpy tensor representing the probability of transitioning from one state to the next state.
These will be generated by a probability distribution with the following properties:
0. Transitions follow a gaussian distribution proportional to the manhattan dist
between states.
1. Logical successors to this state, that is ones where the score decreases, are
more likely.
2. The exploit being used is even more likely to decrease.
:param iterable states: The valid states for this MDP.
:param iterable actions: The valid actions for this MDP.
:param np.type dist_type: The datatype to use for representing distances. Uses \
unsigned int16 if not specifed.
:param np.type transition_type: The datatype to use for the transition tensor. \
Values are between 0 and 1, so this only determines the precision.
:param float shrink_bias: How much to prioritize shrinking states over growing \
states. For example, with value 4 transitioning from (1,1,1) to (0,1,1) is \
four times as likely as transitioning to (2,1,1).
:param float max_z_score: indicates the z score to assign to the largest distance.\
By default this is 5, which is basically 0 probability.
:param float usage_weight: indicates the exponential constant to use for \
weighting. 1.08 means at 10 usages, the reward dropping is 2.2 times as likely \
as normal. If this value is 1, repeated uses has no effect.
:param int horizon: indicates the
"""
def get_dist(state1, state2):
n = np.linalg.norm((self.get_rewards_from_state(state1) -
self.get_rewards_from_state(state2)), ord=1)
return n
# if this has been generated before, just load it from binary.
fname = 'transitions_%s_%s_%s_%s_nousage.pickle' % (opponent_count, len(actions), horizon, usage_weight)
# fname = 'transitions_%s_%s_%s.pickle' % (opponent_count, len(actions), horizon)
print(f'Checking for file {fname}')
if isfile(fname):
return pickle.load(open(fname, 'rb'))
print(f'Creating transition tensor.')
# create 2d dist array of state to state distance. Prevents recalculation.
# There should be a much more efficient way of doing this.
# NOTE: using uint8 here means a distance of 255 is the max. In order for this
# to occur, we would have to have (for instance) 50 players, six exploits, and
# terminal vs initial state. This seems a reasonable assumption for now, but
# we will handle it by setting all such items to max distance.
dist_name = 'dists_%s_%s_%s.pickle' % (opponent_count, len(actions), horizon)
if not isfile(dist_name):
dist_vector = np.vectorize(get_dist)
dists = np.fromfunction(dist_vector, shape=(len(states), len(states)), dtype=dist_type)
pickle.dump(dists, open(dist_name, 'wb+'))
print('Dists written!')
else:
dists = pickle.load(open(dist_name, 'rb'))
print(f'Dists array of shape {dists.shape} computed')
# calculate the size of a z-score bin. We want the furthest possible dist
# to have a z-score of max_z_score. Thus we can calculate z-score
# w/ (val / max_dist) * max_dist. This linearly maps z scores from values
# along continuum.
max_dist = np.max(dists)
print(f'Max Distance is {max_dist}')
transition = np.zeros((len(states), len(actions), len(states)), dtype=transition_type)
for s in range(len(states)):
dist = dists[s] # distance to other states by index. Grabbed once for efficiency.
for a, action in enumerate(actions):
# NOTE - t his doesn't guarantee a sum of 1. That requires post processing later.
for o in range(len(states)):
# If this state can't follow previous due to the exploit use count, then
# set to 0 probability and continue.
if not self.is_successor(s, o, a):
transition[s][a][o] = 0
continue
# set the probability of reward shrinking as far more likely than increasing.
shrink_scale = shrink_bias if self.state_is_less(s, o) else 1
# increase chances of shrinking by usage.
uses = self.get_usages_from_state(s)
uses = uses[a] if a < len(uses) else 0
uses_scale = usage_weight ** uses
# calculate z score of this state on the guassian curve.
z_score = (dist[o] / max_dist) * max_z_score
temp = halfnorm.pdf(z_score)
val = temp * shrink_scale * uses_scale
transition[s][a][o] = val
# Normalize s,a. The probability of ending up in SOME state after taking
# action a in state s should always be one.
# Correction - in some terminal states the probability of transitioning out will
# be 0. This is fine.
s_a_sum = np.sum(transition[s][a])
if s_a_sum != 0:
transition[s][a] /= s_a_sum
print("Completed vector (%s, %s)" % (s, 'exploit' + str(a)))
# save to pickle so we don't have to do this process again.
assert(not np.any((np.isnan(transition))))
pickle.dump(transition, open(fname, 'wb+'))
return transition
def GET_SUBSTATES(self, num_states, num_actions):
next = {}
for s in range(num_states):
for action in range(num_actions):
next[s,action] = list()
for other in range(num_states):
if self.is_successor(s, other, action):
next[s, action].append(other)
# stringify.
out = ''
keys = sorted(list(next.keys()))
for state, action in keys:
out += f'{self.s[state].tolist()} - {action}\n'
for item in next[state, action]:
out += f'\t{self.s[item].tolist()}'
system(f'echo {out} > transition.txt')
class UsageUtils():
"""
Simply grouping these so they are easier to read.
"""
@staticmethod
def evaluate(opponent_count, exploit_count, horizon, usage=1.00, nop=False):
test = ExploitSelectionWithUsage(opponent_count=opponent_count, exploit_count=exploit_count, horizon=horizon, usage=usage, nop=nop)
values, policy = test.value_iteration()
actions = test.a
states = test.s
filename = f'policy_{opponent_count}_{exploit_count}_{horizon}_{usage}.csv'
UsageUtils.to_csv(policy, values, actions, states, filename)
UsageUtils.plot_by_rewards(policy,states, horizon, exploit_count, opponent_count, usage)
UsageUtils.plot_policy_by_horizon(policy, states, horizon, exploit_count, opponent_count, usage)
@staticmethod
def test_usages(opponent_count, exploit_count, horizon, start=1.00, end=1.20, count=11):
for i in np.linspace(start, end, count):
print(f'Starting test on {i}.')
UsageUtils.evaluate(opponent_count, exploit_count, horizon, i)
@staticmethod
def to_csv(policy, values, actions, states, file=stdout):
if isinstance(file, str):
file = open(file, 'w+')
header = 'action,value'
for a, action in enumerate(actions):
header += f',reward_{a},uses_{a}'
file.write(header + '\n')
for state, value, action in zip(states, values, policy):
line = f'{action},{value}'
for a, action in enumerate(actions):
try:
reward, uses = state[a]
except:
reward, uses = 0, '?'
line += f',{reward},{uses}'
file.write(line + '\n')
print('File Written.')
@staticmethod
def plot_by_rewards(policy, states, horizon, exploit_count, opponent_count, usage, filename=None):
def get_rewards(state):
return state[:,0].copy()
shape = (opponent_count,) * exploit_count
to_display = np.zeros(shape=shape)
for index, _ in np.ndenumerate(to_display):
subset = [s for s, state in enumerate(states) if np.array_equal(get_rewards(state), np.array(index))]
c = Counter()
for s in subset:
c.update( [ policy[s] ] )
# get top assignments.
class_values = [100, -100, 0]
val = c.most_common(2)
first = val[0]
if len(val) < 2:
second =(0,0)
else:
second = val[1]
if first[1] == second[1]:
val = 0
else:
val = (class_values[first[0]] * first[1]) / (first[1] + second[1])
to_display[index] = val
plt.imshow(to_display, cmap='bwr', interpolation='nearest')
if filename is None:
filename = f'actions_opp{opponent_count}_exploits{exploit_count}_horizon{horizon}_usage{usage}.png'
plt.savefig(filename)
@staticmethod
def plot_policy_by_horizon(policy, states, horizon, exploit_count, opponent_count, usage, filename=None):
def get_uses(state):
return state[:,1].copy()
shape = (horizon,) * exploit_count
to_display = np.zeros(shape=shape)
class_values = [100, -100, 0] # simple weighting for different classes for display purporsed.
# to_display = np.fromfunction(shape=shape, function=actions_with_horizon)
for index, _ in np.ndenumerate(to_display):
subset = [s for s, state in enumerate(states) if np.array_equal(get_uses(state), np.array(index))]
c = Counter()
for s in subset:
c.update( [ policy[s] ] )
# Get the top class assignments. Create value based on those weighted assignments.
val = c.most_common(2)
if len(val) == 0:
val = 0
else:
first = val[0]
if len(val) < 2:
second =(0,0)
else:
second = val[1]
if first[1] == second[1]:
val = 0
else:
val = (class_values[first[0]] * first[1]) / (first[1] + second[1])
to_display[index] = val
plt.imshow(to_display, cmap='bwr', interpolation='nearest')
if filename is None:
filename = f'display_opp{opponent_count}_exploits{exploit_count}_horizon{horizon}_usage{usage}.png'
plt.savefig(filename)
if __name__ == '__main__':
UsageUtils.evaluate(5,2,20, usage=1.07)