Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simulate subcommand to network survey script #4177

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,5 @@ min-testcases/
/src/util/xdrquery/XDRQueryParser.h
/src/util/xdrquery/XDRQueryParser.cpp
/src/util/xdrquery/stack.hh

__pycache__
85 changes: 64 additions & 21 deletions scripts/OverlaySurvey.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@
import sys
import time

import overlay_survey.simulation as sim

# A SurveySimulation, if running in simulation mode, or None otherwise.
SIMULATION = None

def get_request(url, params=None):
""" Make a GET request, or simulate one if running in simulation mode. """
if SIMULATION:
return SIMULATION.get(url=url, params=params)
else:
return requests.get(url=url, params=params)

def next_peer(direction_tag, node_info):
if direction_tag in node_info and node_info[direction_tag]:
Expand Down Expand Up @@ -140,7 +151,7 @@ def send_requests(peer_list, params, request_url):
# Submit `limit` queries roughly every ledger
for key in peer_list:
params["node"] = key
requests.get(url=request_url, params=params)
get_request(url=request_url, params=params)
print("Send request to %s" % key)
global request_count
request_count += 1
Expand Down Expand Up @@ -224,8 +235,8 @@ def get_tier1_stats(augmented_directed_graph):

def augment(args):
graph = nx.read_graphml(args.graphmlInput)
data = requests.get("https://api.stellarbeat.io/v1/nodes").json()
transitive_quorum = requests.get(
data = get_request("https://api.stellarbeat.io/v1/nodes").json()
transitive_quorum = get_request(
"https://api.stellarbeat.io/v1/").json()["transitiveQuorumSet"]

for obj in data:
Expand Down Expand Up @@ -273,6 +284,13 @@ def run_survey(args):
"inboundPeers": {},
"outboundPeers": {}
})
if args.simulate:
global SIMULATION
try:
SIMULATION = sim.SurveySimulation(args.simGraph, args.simRoot)
except sim.SimulationError as e:
print(f"Error: {e}")
sys.exit(1)

url = args.node

Expand All @@ -285,7 +303,7 @@ def run_survey(args):
params = {'duration': duration}

# reset survey
requests.get(url=stop_survey)
get_request(url=stop_survey)

peer_list = set()
if args.nodeList:
Expand All @@ -296,7 +314,7 @@ def run_survey(args):

peers_params = {'fullkeys': "true"}

peers = requests.get(url=peers, params=peers_params).json()[
peers = get_request(url=peers, params=peers_params).json()[
"authenticated_peers"]

# seed initial peers off of /peers endpoint
Expand All @@ -307,9 +325,10 @@ def run_survey(args):
for peer in peers["outbound"]:
peer_list.add(peer["id"])

self_name = requests.get(url + "/scp?limit=0&fullkeys=true").json()["you"]
scp_params = {'fullkeys': "true", 'limit': 0}
self_name = get_request(url + "/scp", scp_params).json()["you"]
graph.add_node(self_name,
version=requests.get(url + "/info").json()["info"]["build"],
version=get_request(url + "/info").json()["info"]["build"],
numTotalInboundPeers=len(peers["inbound"] or []),
numTotalOutboundPeers=len(peers["outbound"] or []))

Expand All @@ -328,7 +347,7 @@ def run_survey(args):
time.sleep(1)

print("Fetching survey result")
data = requests.get(url=survey_result).json()
data = get_request(url=survey_result).json()
print("Done")

if "topology" in data:
Expand Down Expand Up @@ -366,6 +385,12 @@ def run_survey(args):
print("New peers: %s Retrying: %s" %
(new_peers, len(peer_list)-new_peers))

# sanity check that simulation produced a graph isomorphic to the input
assert (not args.simulate or
nx.is_isomorphic(graph, nx.read_graphml(args.simGraph))), \
("Simulation produced a graph that is not isomorphic to the input "
"graph")

if nx.is_empty(graph):
print("Graph is empty!")
sys.exit(0)
Expand Down Expand Up @@ -395,19 +420,8 @@ def flatten(args):
json.dump(output_graph, output_file)
sys.exit(0)


def main():
# construct the argument parse and parse the arguments
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("-gs",
"--graphStats",
help="output file for graph stats")

subparsers = argument_parser.add_subparsers()

parser_survey = subparsers.add_parser('survey',
help="run survey and "
"analyze results")
def init_parser_survey(parser_survey):
"""Initialize the `survey` subcommand"""
parser_survey.add_argument("-n",
"--node",
required=True,
Expand All @@ -429,6 +443,35 @@ def main():
help="optional list of seed nodes")
parser_survey.set_defaults(func=run_survey)

def main():
# construct the argument parse and parse the arguments
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("-gs",
"--graphStats",
help="output file for graph stats")

subparsers = argument_parser.add_subparsers()

parser_survey = subparsers.add_parser('survey',
help="run survey and "
"analyze results")
parser_survey.set_defaults(simulate=False)
init_parser_survey(parser_survey)
parser_simulate = subparsers.add_parser('simulate',
help="simulate survey run")
# `simulate` supports all arguments that `survey` does, plus some additional
# arguments for the simulation itself.
init_parser_survey(parser_simulate)
parser_simulate.add_argument("-s",
"--simGraph",
required=True,
help="graphml file to simulate network from")
parser_simulate.add_argument("-r",
"--simRoot",
required=True,
help="node to start simulation from")
parser_simulate.set_defaults(simulate=True)

parser_analyze = subparsers.add_parser('analyze',
help="write stats for "
"the graphml input graph")
Expand Down
3 changes: 3 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ This folder is for storing any scripts that may be helpful for using stellar-cor
- `-nl NODELIST`, `--nodeList NODELIST` - list of seed nodes. One node per line. (Optional)
- `-gmlw GRAPHMLWRITE`, `--graphmlWrite GRAPHMLWRITE` - output file for graphml file
- `-sr SURVEYRESULT`, `--surveyResult SURVEYRESULT` - output file for survey results
- sub command `simulate` - simulate a run of the `survey` subcommand without any network calls. Takes the same arguments as `survey`, plus the following:
- `-s SIMGRAPH`, `--simGraph SIMGRAPH` - Network topology to simulate in graphml format.
- `r SIMROOT`, `--simRoot SIMROOT` - Node in graph to start simulation from.
- sub command `analyze` - analyze an existing graph
- `-gmla GRAPHMLANALYZE`, `--graphmlAnalyze GRAPHMLANALYZE` - input graphml file
- sub command `augment` - augment an existing graph with information from stellarbeat.io. Currently, only Public Network graphs are supported.
Expand Down
141 changes: 141 additions & 0 deletions scripts/overlay_survey/simulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
This module simulates the HTTP endpoints of stellar-core's overlay survey
"""

import networkx as nx

class SimulationError(Exception):
"""An error that occurs during simulation"""

class SimulatedResponse:
"""Simulates a `requests.Response`"""
def __init__(self, json):
self._json = json

def json(self):
"""Simulates the `json` method of a `requests.Response`"""
return self._json

class SurveySimulation:
"""
Simulates the HTTP endpoints of stellar-core's overlay survey. Raises
SimulationError if `root_node` is not in the graph represented by
`graph_path`.
"""
def __init__(self, graph_path, root_node):
# The graph of the network being simulated
self._graph = nx.read_graphml(graph_path)
if root_node not in self._graph.nodes:
raise SimulationError(f"root node '{root_node}' not in graph")
# The node the simulation is being performed from
self._root_node = root_node
# The set of requests that have not yet been simulated
self._pending_requests = []
# The results of the simulation
self._results = {"topology" : {}}
print(f"simulating from {root_node}")

def _info(self, params):
"""
Simulate the info endpoint. Only fills in the version info for the
root node.
"""
assert not params, f"Unsupported info invocation with params: {params}"
version = self._graph.nodes[self._root_node]["version"]
return SimulatedResponse({"info" : {"build" : version}})

def _peers(self, params):
"""
Simulate the peers endpoint. Only fills in the "id" field of each
authenticated peer.
"""
assert params == {"fullkeys": "true"}, \
f"Unsupported peers invocation with params: {params}"
json = {"authenticated_peers": {"inbound": [], "outbound": []}}
for peer in self._graph.in_edges(self._root_node):
json["authenticated_peers"]["inbound"].append({"id" : peer[0]})
for peer in self._graph.out_edges(self._root_node):
json["authenticated_peers"]["outbound"].append({"id" : peer[1]})
return SimulatedResponse(json)

def _scp(self, params):
"""Simulate the scp endpoint. Only fills in the "you" field"""
assert params == {"fullkeys": "true", "limit": 0}, \
f"Unsupported scp invocation with params: {params}"
return SimulatedResponse({"you": self._root_node})

def _surveytopology(self, params):
"""
Simulate the surveytopology endpoint. This endpoint currently ignores
the `duration` parameter
"""
assert params.keys() == {"node", "duration"}, \
f"Unsupported surveytopology invocation with params: {params}"
if params["node"] != self._root_node:
self._pending_requests.append(params["node"])

def _addpeer(self, node_id, edge_data, peers):
"""
Given data on a graph edge in `edge_data`, translate to the expected
getsurveyresult json and add to `peers` list
"""
# Start with data on the edge itself
peer_json = edge_data.copy()
# Add peer's node id and version
peer_json["nodeId"] = node_id
peer_json["version"] = self._graph.nodes[node_id]["version"]
# Add to inboundPeers
peers.append(peer_json)


def _getsurveyresult(self, params):
"""Simulate the getsurveyresult endpoint"""
assert not params, \
f"Unsupported getsurveyresult invocation with params: {params}"

# For simulation purposes, the survey is in progress so long as there
# are still pending requests to simulate.
self._results["surveyInProgress"] = bool(self._pending_requests)

# Update results
while self._pending_requests:
node = self._pending_requests.pop()

# Start with info on the node itself
node_json = self._graph.nodes[node].copy()

# Remove "version" field, which is not part of stellar-core's
# response
del node_json["version"]

# Generate inboundPeers list
node_json["inboundPeers"] = []
for (node_id, _, data) in self._graph.in_edges(node, True):
self._addpeer(node_id, data, node_json["inboundPeers"])

# Generate outboundPeers list
node_json["outboundPeers"] = []
for (_, node_id, data) in self._graph.out_edges(node, True):
self._addpeer(node_id, data, node_json["outboundPeers"])

self._results["topology"][node] = node_json
return SimulatedResponse(self._results)

def get(self, url, params):
"""Simulate a GET request"""
endpoint = url.split("/")[-1]
if endpoint == "stopsurvey":
# Do nothing
return
if endpoint == "info":
return self._info(params)
if endpoint == "peers":
return self._peers(params)
if endpoint == "scp":
return self._scp(params)
if endpoint == "surveytopology":
return self._surveytopology(params)
if endpoint == "getsurveyresult":
return self._getsurveyresult(params)
raise SimulationError("Received GET request for unknown endpoint "
f"'{endpoint}' with params '{params}'")