Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parallel processing #140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions skymap_scanner/client/reco_icetray.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import pickle
from multiprocessing import Process
from pathlib import Path
from typing import Any, List

Expand Down Expand Up @@ -115,7 +116,7 @@ def get_GCD_diff_base_handle(baseline_GCD_file: str) -> Any:


def reco_pixel(
reco_algo: str,
reco_obj: recos.RecoInterface,
pframe: icetray.I3Frame,
GCDQp_packet: List[icetray.I3Frame],
baseline_GCD_file: str,
Expand Down Expand Up @@ -165,8 +166,8 @@ def UncompressGCD(tray, name, base_GCD_path, base_GCD_filename):

# perform fit
tray.AddSegment(
recos.get_reco_interface_object(reco_algo).traysegment,
f"{reco_algo}_traysegment",
reco_obj.traysegment,
f"{reco_obj.name}_traysegment",
logger=LOGGER,
seed=pframe[f"{cfg.OUTPUT_PARTICLE_NAME}"],
)
Expand All @@ -193,7 +194,7 @@ def writeout_reco(frame: icetray.I3Frame) -> None:
GCDQp_packet,
filestager=dataio.get_stagers(),
)[0]
pixreco = pixelreco.PixelReco.from_i3frame(frame, geometry, reco_algo)
pixreco = pixelreco.PixelReco.from_i3frame(frame, geometry, reco_obj)
LOGGER.info(f"PixelReco: {pixreco}")
pickle.dump(pixreco, f)

Expand Down Expand Up @@ -290,22 +291,52 @@ def main() -> None:
with open(args.in_pkl, "rb") as f:
msg = pickle.load(f)
reco_algo = msg[cfg.MSG_KEY_RECO_ALGO]
pframe = msg[cfg.MSG_KEY_PFRAME]
pframes = msg[cfg.MSG_KEY_PFRAME]

# get GCDQp_packet
with open(args.GCDQp_packet_json, "r") as f:
GCDQp_packet = full_event_followup.i3live_json_to_frame_packet(
f.read(), pnf_framing=False
)

# go!
reco_pixel(
reco_algo,
pframe,
GCDQp_packet,
str(args.baseline_GCD_file),
args.out_pkl,
)
reco_obj = recos.get_reco_interface_object(reco_algo)
processes = []
outpaths = []

# loop over the pframes to run in parallel
for i, pframe in enumerate(pframes):
outpath = Path(f"out{i:04}.pkl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this use a more unique path, either using tempfile or maybe with the parent process pid? I'm thinking of the possibility of running two instances side by side, which could happen in EWMS.

proc_args = (
reco_obj,
pframe,
GCDQp_packet,
str(args.baseline_GCD_file),
outpath,
)

# spawn subprocess
p = Process(target=reco_pixel, args=proc_args)
p.start()
LOGGER.info(f"Started Process {i}")
processes.append(p)
outpaths.append(outpath)

# wait for subprocess to finish
for p in processes:
p.join()
logging.info(f"finished proc for fun_id={p}")

# combine output data into a single pickle file
outdata = []
for path in outpaths:
with open(path, "rb") as f:
outdata.append(pickle.load(f))
path.unlink()

# write output in a single single file
with open(args.out_pkl, "wb") as f:
pickle.dump(outdata, f)

LOGGER.info("Done reco'ing pixel.")


Expand Down
4 changes: 3 additions & 1 deletion skymap_scanner/recos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def get_reco_interface_object(name: str) -> RecoInterface:
"""Dynamically import the reco sub-module's class."""
try:
module = importlib.import_module(f"{__name__}.{name.lower()}")
return getattr(module, ''.join(x.capitalize() for x in name.split('_')))
reco_class = getattr(module, ''.join(x.capitalize() for x in name.split('_')))
reco_class.name = name
return reco_class
except ModuleNotFoundError as e:
if name not in get_all_reco_algos():
# checking this in 'except' allows us to use 'from e'
Expand Down
43 changes: 30 additions & 13 deletions skymap_scanner/server/start_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ async def serve(
min_nside: int, # TODO: replace with nsides & implement (https://github.com/icecube/skymap_scanner/issues/79)
max_nside: int, # TODO: remove (https://github.com/icecube/skymap_scanner/issues/79)
skydriver_rc: Optional[RestClient],
parallel_trays: int,
) -> pixelreco.NSidesDict:
"""Send pixels to be reco'd by client(s), then collect results and save to
disk."""
Expand Down Expand Up @@ -834,6 +835,7 @@ async def serve(
nsides_dict,
pixeler,
progress_reporter,
parallel_trays,
)
if not n_pixreco: # we're done
break
Expand Down Expand Up @@ -863,6 +865,7 @@ async def serve_scan_iteration(
nsides_dict: pixelreco.NSidesDict,
pixeler: PixelsToReco,
progress_reporter: ProgressReporter,
parallel_trays: int,
) -> int:
"""Run the next (or first) scan iteration (set of pixel-recos).

Expand All @@ -877,16 +880,21 @@ async def serve_scan_iteration(
LOGGER.info("Getting pixels to send to clients...")
pixreco_ids_sent = set([])
async with to_clients_queue.open_pub() as pub:
for i, pframe in enumerate(pixeler.generate_pframes()): # queue_to_clients
_tup = pixelreco.pixel_to_tuple(pframe)
generator = pixeler.generate_pframes()
for i in it.count():
pframes = list(it.islice(generator,parallel_trays))
if not pframes:
break
_tup = [pixelreco.pixel_to_tuple(pframe) for pframe in pframes]
LOGGER.info(f"Sending message M#{i} ({_tup})...")
await pub.send(
{
cfg.MSG_KEY_RECO_ALGO: reco_algo,
cfg.MSG_KEY_PFRAME: pframe,
cfg.MSG_KEY_PFRAME: pframes,
}
)
pixreco_ids_sent.add(_tup)
for t in _tup:
pixreco_ids_sent.add(t)

# check if anything was actually processed
if not pixreco_ids_sent:
Expand All @@ -909,15 +917,16 @@ async def serve_scan_iteration(
LOGGER.info("Receiving pixel-recos from clients...")
async with collector as col: # enter collector 1st for detecting when no pixel-recos received
async with from_clients_queue.open_sub() as sub:
async for pixreco in sub:
if not isinstance(pixreco, pixelreco.PixelReco):
raise ValueError(
f"Message not {pixelreco.PixelReco}: {type(pixreco)}"
)
try:
await col.collect(pixreco)
except DuplicatePixelRecoException as e:
logging.error(e)
async for pixrecos in sub:
for pixreco in pixrecos:
if not isinstance(pixreco, pixelreco.PixelReco):
raise ValueError(
f"Message not {pixelreco.PixelReco}: {type(pixreco)}"
)
try:
await col.collect(pixreco)
except DuplicatePixelRecoException as e:
logging.error(e)
# if we've got all the pixrecos, no need to wait for queue's timeout
if col.pixreco_ids_sent == col.pixreco_ids_received:
break
Expand Down Expand Up @@ -1073,6 +1082,13 @@ def _nside_and_pixelextension(val: str) -> int: # -> Tuple[int, int]:
help='include this flag if the event was simulated',
)

parser.add_argument(
"--parallel-trays",
type=int,
default=8,
help='Number of Parallel instances of icetray to run on each client',
)

args = parser.parse_args()
logging_tools.set_level(
cfg.ENV.SKYSCAN_LOG,
Expand Down Expand Up @@ -1172,6 +1188,7 @@ def _nside_and_pixelextension(val: str) -> int: # -> Tuple[int, int]:
min_nside=min_nside, # TODO: replace with args.nsides & implement (https://github.com/icecube/skymap_scanner/issues/79)
max_nside=max_nside, # TODO: remove (https://github.com/icecube/skymap_scanner/issues/79)
skydriver_rc=skydriver_rc,
parallel_trays=args.parallel_trays,
)
)
LOGGER.info("Done.")
Expand Down
4 changes: 2 additions & 2 deletions skymap_scanner/utils/pixelreco.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def __post_init__(self) -> None:
def from_i3frame(
frame: I3Frame,
geometry: I3Frame,
reco_algo: str,
reco_obj: recos.RecoInterface,
) -> "PixelReco":
"""Get a PixelReco instance by parsing the I3Frame."""
return recos.get_reco_interface_object(reco_algo).to_pixelreco(frame, geometry)
return reco_obj.to_pixelreco(frame, geometry)


NSidesDict = Dict[int, Dict[int, PixelReco]]