-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexploit_nousage.py
256 lines (209 loc) · 11.5 KB
/
exploit_nousage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from frmdp import FRMDP
from util import timed
from matplotlib import pyplot as plt
from scipy.stats import halfnorm
from itertools import product
from os.path import isfile
import numpy as np
import pickle
class ExploitSelection(FRMDP):
"""
MDP to calculate the optimal policy for what exploit to use in a given state.
"""
def __init__(self, opponent_count=2, exploit_count=2, horizon=10, dtype=np.int8):
"""
Initialize the values for the exploit selection MDP for a given service.
:param int opponent_count: number of opposing teams. This is equal to the maximum\
reward for a given tick.
:param int exploit_count: number of exploit options for the service.
:param int horizon: how long the ctf goes on.
:param np.type dtype: Type of data to use for representing the states. Default is int8\
which limits the number of opponents and horizon to 127. This is a fairly resonable\
restriction and saves memory, which is why it is implemented this way.
"""
# get max value for this datatype. this will be used to ensure valid parameters.
max_val = np.iinfo(dtype).max
if opponent_count > max_val or horizon > max_val:
raise ValueError('Number of opponents and horizon must be less than or equal to %s.' % max_val)
# create string representation of actions. This is just for user interfacing.
actions = ['exploit' + str(i) for i in range(exploit_count)]
# STATES equals the reward reaped from each exploit. This is an effective way to
# circumnavigate having to do a stochastic game to represent muiltiple entities,
# since we don't really care what their individual state is, only our total reward.
# get all possible opponent, horizon tuples.
all_states = product(range(opponent_count), repeat=exploit_count)
# Make into numpy arrays for efficiency
states = [np.array(item) for item in all_states]
# pprint(states, actions)
print(f'Number of States is {len(states)}.')
# Old, simple state system.
# temp = product(*([range(opponent_count + 1)] * exploit_count))
# states = [np.array(item, dtype=dtype) for item in temp]
# generate terminal and initial state.
self.terminal = np.zeros(shape=exploit_count, dtype=dtype)
self.initial = np.full(shape=exploit_count, fill_value=opponent_count, dtype=dtype)
# No termination reward - doesn't matter at what point you end.
termination = np.zeros(shape=len(states), dtype=dtype)
# Get the transition probability P(s' | s, a) for each s, a, s'.
super().__init__(states, actions, termination, None, horizon)
self.t = self.generate_transition(states, actions, opponent_count, dtype, horizon=horizon)
def get_reward(self, state, action, state_reward):
"""
Gets reward for transition from state->action->nextState.
:param int state: The current state id.
:param int action: The current action id.
:param ndarray state_reward: The vector of rewards from the previous iteration of this state, action pair.
:return vector of rewards for each next_state.
"""
try:
reward = self.get_rewards_from_state(state)[action] + state_reward
except:
reward = 0 + state_reward
return reward
def is_terminal(self, state):
"""
Checks if this state is a terminal state.
:param int state: The index of the state to check.
:return True if terminal, False otherwise.
"""
return np.array_equal(self.terminal, self.get_rewards_from_state(state))
# Utilities ###########################################################
def get_rewards_from_state(self, s):
"""
Retrieves the rewards encoded in the state.
:param int s: state index.
:return np.array array w/ reward for each state.
"""
return self.s[s].copy()
def is_successor(self, s1, s2, action):
"""
Checks if state2 is a POSSIBLE successor to state1. Possible meaning that
the exploit usage is correct.
:param int s1: index of source state.
:param int s2: index of dest state.
:param int action: action index.
:return boolean indicating if successor or not.
"""
for item in range(len(self.a)):
if self.s[s2][item] > self.s[s1][item]:
return False
return True
# return np.array_equal(rewards1, rewards2)
def state_is_less(self, s1, s2):
"""
Checks if all the rewards of state 2 are less than or equal to those of
state 1.
:param int s1: index of source state.
:param int s2: index of dest state.
:return boolean indicating if s2 <= s1.
"""
return np.all(
np.less_equal(self.get_rewards_from_state(s2), self.get_rewards_from_state(s1))
)
def generate_transition(self, states, actions, opponent_count, dist_type=np.uint16, transition_type=np.float_,
shrink_bias=4, max_z_score=5, horizon=5):
"""
Create numpy tensor representing the probability of transitioning from one state to the next state.
These will be generated by a probability distribution with the following properties:
0. Transitions follow a gaussian distribution proportional to the manhattan dist
between states.
1. Logical successors to this state, that is ones where the score decreases, are
more likely.
2. The exploit being used is even more likely to decrease.
:param iterable states: The valid states for this MDP.
:param iterable actions: The valid actions for this MDP.
:param np.type dist_type: The datatype to use for representing distances. Uses \
unsigned int16 if not specifed.
:param np.type transition_type: The datatype to use for the transition tensor. \
Values are between 0 and 1, so this only determines the precision.
:param float shrink_bias: How much to prioritize shrinking states over growing \
states. For example, with value 4 transitioning from (1,1,1) to (0,1,1) is \
four times as likely as transitioning to (2,1,1).
:param float max_z_score: indicates the z score to assign to the largest distance.\
By default this is 5, which is basically 0 probability.
:param int horizon: indicates the
"""
def get_dist(state1, state2):
n = np.linalg.norm((self.get_rewards_from_state(state1) -
self.get_rewards_from_state(state2)), ord=1)
return n
# if this has been generated before, just load it from binary.
fname = 'nousage_transitions_%s_%s_%s.pickle' % (opponent_count, len(actions), horizon)
print(f'Checking for file {fname}')
if isfile(fname):
return pickle.load(open(fname, 'rb'))
print(f'Creating transition tensor.')
# create 2d dist array of state to state distance. Prevents recalculation.
# There should be a much more efficient way of doing this.
# NOTE: using uint8 here means a distance of 255 is the max. In order for this
# to occur, we would have to have (for instance) 50 players, six exploits, and
# terminal vs initial state. This seems a reasonable assumption for now, but
# we will handle it by setting all such items to max distance.
dist_name = 'nousage_dists_%s_%s_%s.pickle' % (opponent_count, len(actions), horizon)
if not isfile(fname):
dist_vector = np.vectorize(get_dist)
dists = np.fromfunction(dist_vector, shape=(len(states), len(states)), dtype=dist_type)
pickle.dump(dists, open(dist_name, 'wb+'))
print('Dists written!')
else:
dists = pickle.load(open(dist_name, 'rb'))
print(f'Dists array of shape {dists.shape} computed')
# calculate the size of a z-score bin. We want the furthest possible dist
# to have a z-score of max_z_score. Thus we can calculate z-score
# w/ (val / max_dist) * max_dist. This linearly maps z scores from values
# along continuum.
max_dist = np.max(dists)
print(f'Max Distance is {max_dist}')
transition = np.zeros((len(states), len(actions), len(states)), dtype=transition_type)
for s in range(len(states)):
dist = dists[s] # distance to other states by index. Grabbed once for efficiency.
for a, action in enumerate(actions):
# NOTE - t his doesn't guarantee a sum of 1. That requires post processing later.
for o in range(len(states)):
# If this state can't follow previous due to the exploit use count, then
# set to 0 probability and continue.
if not self.is_successor(s, o, a):
transition[s][a][o] = 0
continue
# set the probability of reward shrinking as far more likely than increasing.
shrink_scale = shrink_bias if self.state_is_less(s, o) else 1
# calculate z score of this state on the guassian curve.
z_score = (dist[o] / max_dist) * max_z_score
temp = halfnorm.pdf(z_score)
val = temp * shrink_scale # * uses_scale
transition[s][a][o] = val
# Normalize s,a. The probability of ending up in SOME state after taking
# action a in state s should always be one.
# Correction - in some terminal states the probability of transitioning out will
# be 0. This is fine.
s_a_sum = np.sum(transition[s][a])
if s_a_sum != 0:
transition[s][a] /= s_a_sum
print("Completed vector (%s, %s)" % (s, 'exploit' + str(a)))
# save to pickle so we don't have to do this process again.
assert(not np.any((np.isnan(transition))))
pickle.dump(transition, open(fname, 'wb+'))
return transition
class NoUsageUtils():
@staticmethod
def evaluate(opponent_count, exploit_count, horizon):
test = ExploitSelection(opponent_count=opponent_count, exploit_count=exploit_count, horizon=horizon)
values, policy = test.value_iteration()
actions = test.a
states = test.s
filename = f'no_usage_policy_{opponent_count}_{exploit_count}_{horizon}.csv'
# plot_policy_by_horizon(policy, states, horizon, exploit_count, opponent_count, filename='test.png')
NoUsageUtils.plot_by_rewards_nousages(policy, states, horizon, exploit_count, opponent_count)
@staticmethod
def plot_by_rewards_nousages(policy, states, horizon, exploit_count, opponent_count, filename=None):
shape = (opponent_count,) * exploit_count
to_display = np.zeros(shape=shape)
for index in range(len(states)):
state = states[index]
to_display[state[0], state[1]] = policy[index]
plt.imshow(to_display, cmap='bwr', interpolation='nearest')
if filename is None:
filename = f'actions_opp{opponent_count}_exploits{exploit_count}_horizon{horizon}.png'
plt.savefig(filename)
if __name__ == '__main__':
NoUsageUtils.evaluate(5,2,20)