-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPath.py
193 lines (162 loc) · 5.92 KB
/
Path.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Mar 24 12:00:05 2020
@author: hudson
"""
import networkx as nx
import QNET
import numpy as np
import collections
import copy
class Path:
def __init__(self, G, array, edge_keys=None):
"""
Initialization method for the Qnet Path class
Parameters
----------
G: Qnet()
array: list
List of nodes in the path
edge_keys:
List of edge keys. Since Qnet is a multigraph, reference to the networkx edge keys is essential in order
to fully distinguish paths that follow the same sequence of nodes.
"""
assert (isinstance(G, QNET.Qnet)), "path.__init__ requires reference to the graph containing the path"
assert (array != None), "path.__init__ received an empty array"
# Build node array from what was given in array
node_array = []
for node in array:
node = G.getNode(node)
assert node is not None
node_array.append(node)
# If edge_keys is None, assume each key is is default "0"
if edge_keys is None:
edge_keys = [0] * (len(node_array) - 1)
self.G = G
self.node_array = node_array
self.edge_keys = edge_keys
self.head = node_array[0]
self.tail = node_array[len(self.node_array) - 1]
self.cost_vector = self.get_cost_vector()
# Assert path is valid in G
# Maybe we could just incorperate this into is_valid instead?
if all([(node_array[i], node_array[i + 1]) in G.edges() \
or (node_array[i + 1], node_array[i]) in G.edges() for i in range(len(node_array) - 1)]):
pass
else:
assert False, f"Path {self.stringify()} does not exist in Qnet."
def __str__(self):
return "Path: " + self.stringify() + ", Cost: " + str(self.cost_vector)
# return self.stringify()
def __repr__(self):
return self.stringify()
def is_valid(self):
"""
Checks to see if the given path is valid for entanglement distribution.
:return: Boolean
"""
has_ground = False
for node in self.node_array:
if isinstance(node, QNET.Ground) or isinstance(node, QNET.Satellite):
has_ground = True
break
return has_ground
def get_cost_vector(self):
# List of cost vectors for all elements in the path:
element_cvs = []
# Get cost vectors of edges
path_len = len(self.node_array)
i = 0
while i < path_len - 1:
cur = self.node_array[i]
nxt = self.node_array[i + 1]
# TODO: Add key key here to make it more clear
edge_data = self.G.get_edge_data(cur, nxt, self.edge_keys[i])
element_cvs.append(edge_data)
i += 1
# Get cost vectors of nodes
for node in self.node_array:
element_cvs.append(node.costs)
# Cost vectors have identical keys
# Sum all key elements together, convert additive costs to correct value, update dictionary
def sum_cost_vectors(cv_list):
counter = collections.Counter()
for d in cv_list:
counter.update(d)
return dict(counter)
new_cv = sum_cost_vectors(element_cvs)
# Convert additive costs into regular costs:
for cost_type in new_cv:
if cost_type.startswith("add_"):
# Get additive cost
cost = new_cv[cost_type]
# Remove prefix of cost_type
cost_type = QNET.remove_prefix(cost_type, "add_")
# Convert additive cost to normal, add it to new_cv
new_cv[cost_type] = self.G.conversions[cost_type][1](cost)
return new_cv
# TODO Test this
def subgraph(self):
return self.G.subgraph(self.array)
def stringify(self):
"""
Returns
-------
pString : str
Stringified version of the path
"""
# Given a path, returns a string of the path
pString = ""
i = 0
while i < len(self.node_array):
pString = pString + str(self.node_array[i].name)
i += 1
if i < len(self.node_array):
pString = pString + "-"
return pString
def remove_edges(self):
"""
Remove all edges in path from the graph G
:return: None
"""
path_len = len(self.node_array)
i = 0
while i < path_len - 1:
cur = self.node_array[i]
nxt = self.node_array[i + 1]
key = self.edge_keys[i]
self.G.remove_edge(cur, nxt, key)
i += 1
def update(self):
self.cost_vector = self.get_cost_vector()
def swap_path(self):
"""
Perform the swapping operation with all valid swapper nodes in the path.
A swapper is valid if there exist ground nodes at any point on its adjacent sides.
Returns:
cost_vector
-------
"""
# Does a ground exist before a potential swapper?
ground_switch = False
# Variable storing a potential swapper
swapper = None
# Swap costs of valid swapper nodes
swap_costs = []
for node in self.node_array:
if isinstance(node, QNET.Ground):
if ground_switch is False:
ground_switch = True
elif swapper is not None:
swap_costs.append(swapper.swap_prob)
swapper = None
# ground_switch = False
if isinstance(node, QNET.Swapper) and ground_switch is True:
swapper = node
swap_cost = 1
for cost in swap_costs:
swap_cost *= cost
new_cv = copy.copy(self.cost_vector)
new_cv['e'] = new_cv['e'] * swap_cost
return new_cv