From 7adccee90ab92bae57427f3c197841bd7bb1f47d Mon Sep 17 00:00:00 2001 From: Jessie Thwaites Date: Mon, 9 Oct 2023 13:42:30 -0500 Subject: [PATCH] O4 realtime (#51) * load up to 5000s to check for events in realtime * add internal public webpage fxn * event time def earlier * get gcn in case p<0.1 * try for pickle load * index at 1 instead of 0 for events on skymap * add a try to convert from moc * add email/sms notif * internal alerts pub webpage updates * formatting for alerts webpage * increment versioning * dont send most prob direction if no coinc events * change header for public wp * make plots w file modification time * precomp gw bg trials * fix document, slack poster * print central energy range min,max * fix documenting bugs * send llama results for 1det event * adding config for sender * add a version of combine_results run on classic * remove uneeded, switch to rel paths * no config needed, switch back to kafka * stop it timing out while waiting for data * correct error processing * add nside param * generate high sig gcn templ, calc per event p when gt 0.1 * add note about plot in pub wp * get type of event and pass to make_call * should be a float for probs * fix bug in 2d webpage link * fix paths * fix paths, handle llama res if no ontime neutrinos * update sender to match schema * update with new key names * add calls to shifters if results not found; 35 min timeout * ra_unc is a list * move incoming alert printing so rev1 doesn't print * seperate alert and gw trials scripts * update submission script for both * call shifters if results not found if not a mock * remove try/except from debugging * increment version --- fast_response/FastResponseAnalysis.py | 2 + fast_response/GWFollowup.py | 6 +- .../MonitoringAndMocks/Data_Display.py | 9 +- fast_response/listeners/gcn_listener.py | 33 +- fast_response/listeners/gw_gcn_listener.py | 17 +- .../glob_precomputed_trials.py | 6 +- .../precomputed_background/precompute_ts.py | 26 +- .../precompute_ts_gw.py | 148 +++++ .../precomputed_background/submit_glob.py | 18 +- .../submit_precomputed_trials.py | 73 ++- .../scripts/combine_results_classic.py | 591 ++++++++++++++++++ .../scripts/combine_results_kafka.py | 123 ++-- .../scripts/run_external_followup.py | 2 + fast_response/scripts/test_listen_kafka.py | 28 +- fast_response/web_utils.py | 14 +- html/gw_pub_templ_base.html | 5 +- setup.py | 2 +- 17 files changed, 956 insertions(+), 147 deletions(-) create mode 100644 fast_response/precomputed_background/precompute_ts_gw.py create mode 100644 fast_response/scripts/combine_results_classic.py diff --git a/fast_response/FastResponseAnalysis.py b/fast_response/FastResponseAnalysis.py index 4591a7b..ce91d27 100644 --- a/fast_response/FastResponseAnalysis.py +++ b/fast_response/FastResponseAnalysis.py @@ -995,6 +995,7 @@ def make_dNdE(self): self.energy_range = (np.min([low_5_min_dec, low_5_max_dec]), np.max([high_5_min_dec, high_5_max_dec])) + print('90% Central Energy Range: {}, {} GeV'.format(round(self.energy_range[0]), round(self.energy_range[1]))) self.save_items['energy_range'] = self.energy_range def ns_scan(self): @@ -1246,6 +1247,7 @@ def make_dNdE(self): plt.legend(loc=4, fontsize=18) plt.savefig(self.analysispath + '/central_90_dNdE.png',bbox_inches='tight') + print('90% Central Energy Range: {}, {} GeV'.format(round(low_5), round(high_5))) self.save_items['energy_range'] = (self.low5, self.high5) def write_circular(self): diff --git a/fast_response/GWFollowup.py b/fast_response/GWFollowup.py index a7291bf..de5fd6d 100644 --- a/fast_response/GWFollowup.py +++ b/fast_response/GWFollowup.py @@ -326,12 +326,12 @@ def write_circular(self): except: noticeID = 'NOTICEID' - if pvalue > 0.1: + if pvalue > 0.2: template_path = os.path.join(base, 'circular_templates/gw_gcn_template_low.txt') else: template_path = os.path.join(base, 'circular_templates/gw_gcn_template_high.txt') - if pvalue>0.1: + if pvalue>0.2: with open(template_path, 'r') as gcn_template: gcn = gcn_template.read() @@ -568,7 +568,7 @@ def per_event_pvalue(self): usemask=False ) - if self.p < 0.05: + if self.p < 0.1: for i in range(self.events_rec_array.size): ts, p = self.per_event_scan(self.events_rec_array[i]) self.events_rec_array['pvalue'][i] = p diff --git a/fast_response/MonitoringAndMocks/Data_Display.py b/fast_response/MonitoringAndMocks/Data_Display.py index 2ca5b69..45b3b5b 100755 --- a/fast_response/MonitoringAndMocks/Data_Display.py +++ b/fast_response/MonitoringAndMocks/Data_Display.py @@ -22,7 +22,7 @@ warnings.filterwarnings('ignore', module='astropy._erfa') def dial_up(who="jessie"): - cell_tower = "/cvmfs/icecube.opensciencegrid.org/users/jthwaites/" + cell_tower = "/home/jthwaites/private/" #halp = "https://icecube.wisc.edu/~jthwaites/FastResponse/error_call.xml" subprocess.call([cell_tower+"make_call.py", f"--{who}=True", '--troubleshoot=True'])#, "--call_file", halp]) #print([cell_tower+"make_call.py", f"--{who}=True", '--troubleshoot=True']) @@ -747,11 +747,8 @@ def make_bg_pval_dist(fontsize=15, lower_y_bound=-3.5): print('Loading %i mocks (may take a while)'%(len(saved_mock_pkl))) for mock in saved_mock_pkl: with open(mock,'rb') as f: - try: - result=pickle.load(f) - except: - print('skipped {}'.format(mock)) - continue + result=pickle.load(f) + #print('skipped {}'.format(mock)) all_mocks[result['name']]=result['p'] print('Done loading mocks.') diff --git a/fast_response/listeners/gcn_listener.py b/fast_response/listeners/gcn_listener.py index 665f74f..8c52c94 100644 --- a/fast_response/listeners/gcn_listener.py +++ b/fast_response/listeners/gcn_listener.py @@ -13,8 +13,6 @@ gcn.notice_types.ICECUBE_CASCADE) def process_gcn(payload, root): - print("INCOMING ALERT: ",datetime.utcnow()) - analysis_path = os.environ.get('FAST_RESPONSE_SCRIPTS') if analysis_path is None: try: @@ -37,19 +35,22 @@ def process_gcn(payload, root): stream = params['Stream'] eventtime = root.find('.//ISOTime').text if stream == '26': + print("INCOMING ALERT: ",datetime.utcnow()) print("Detected cascade type alert, running cascade followup. . . ") alert_type='cascade' event_name='IceCube-Cascade_{}{}{}'.format(eventtime[2:4],eventtime[5:7],eventtime[8:10]) skymap = params['skymap_fits'] else: - print("Found track type alert, running track followup. . . ") alert_type='track' event_name='IceCube-{}{}{}'.format(eventtime[2:4],eventtime[5:7],eventtime[8:10]) # IceCube sends 2: a notice and a revision, only want to run once if int(params['Rev']) !=0: - return + return + + print("INCOMING ALERT: ",datetime.utcnow()) + print("Found track type alert, running track followup. . . ") event_id = params['event_id'] run_id = params['run_id'] @@ -111,14 +112,17 @@ def process_gcn(payload, root): '--suffix={}'.format(suffix)] ) + event_name=event_name+suffix + doc = False if args.document: try: - dir_1000 = glob.glob(os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'), + dir_1000 = glob(os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'), '*{}_1.0e+03_s').format(event_name)) subprocess.call([analysis_path+'document.py', '--path', dir_1000[0]]) - dir_2d = glob.glob(os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'), + dir_2d = glob(os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'), '*{}_1.7e+05_s').format(event_name)) subprocess.call([analysis_path+'document.py', '--path', dir_2d[0]]) + doc=True except: print('Failed to document to private webpage') @@ -130,13 +134,18 @@ def process_gcn(payload, root): if shifters['start'][i]. \n ".format(wp_link_1000) + - "Results for 2d: <{}|link>. \n".format(wp_link_2d) + - + on_shift +'on shift', + wp_link_1000 = '{}{}_{}_1.0e+03_s.html'.format(link, eventtime[0:10].replace('-','_'),event_name) + + day_before = '{}'.format(int(eventtime[8:10])-1) + if len(day_before)==1: day_before='0'+day_before + str_2d = '{}_{}'.format(eventtime[0:7].replace('-','_'),day_before) + wp_link_2d = '{}{}_{}_1.7e+05_s.html'.format(link, str_2d, event_name) + bot.send_message(f'Done running FRA for {alert_type} alert, {event_name}.\n '+ on_shift +'on shift', 'blanket_blob') + if doc: + bot.send_message("-Results for 1000s: <{}|link> \n-Results for 2d: <{}|link>".format( + wp_link_1000, wp_link_2d), + 'blanket_blob') print(' - slack message sent \n') except Exception as e: print(e) diff --git a/fast_response/listeners/gw_gcn_listener.py b/fast_response/listeners/gw_gcn_listener.py index 7bd03f8..fe24fbd 100755 --- a/fast_response/listeners/gw_gcn_listener.py +++ b/fast_response/listeners/gw_gcn_listener.py @@ -61,14 +61,29 @@ def process_gcn(payload, root): print('\n' +'INCOMING ALERT FOUND: ',datetime.utcnow()) log_file.flush() + + #get type of event (burst, bbh, nsbh, bns) + try: + if params['Group'] == 'Burst': + merger_type = 'Burst' + else: + k = ['BNS','NSBH','BBH'] + probs = {j: float(params[j]) for j in k} + merger_type = max(zip(probs.values(), probs.keys()))[1] + except: + print('Could not determine type of event') + merger_type = None if root.attrib['role']=='observation' and not mock: ## Call everyone because it's a real event! - call_command=['/cvmfs/icecube.opensciencegrid.org/users/jthwaites/make_call.py'] + call_command=['/home/jthwaites/private/make_call.py'] call_args = ['--justin'] for arg in call_args: call_command.append(arg+'=True') + if merger_type is not None: + call_command.append(f'--type={merger_type}') + try: subprocess.call(call_command) #print('Call here.') diff --git a/fast_response/precomputed_background/glob_precomputed_trials.py b/fast_response/precomputed_background/glob_precomputed_trials.py index f3276f8..62b13f6 100644 --- a/fast_response/precomputed_background/glob_precomputed_trials.py +++ b/fast_response/precomputed_background/glob_precomputed_trials.py @@ -14,6 +14,8 @@ help='Time Window in seconds') parser.add_argument('--dir',type=str, default='./', help='directory for where trials are, will save npz to dir+/glob_trials/') +parser.add_argument('--nside',type=int, default=256, + help='nside used when running trials (default 256)') args = parser.parse_args() def glob_allsky_scans(delta_t, rate, dir, low_stats=False): @@ -24,7 +26,7 @@ def glob_allsky_scans(delta_t, rate, dir, low_stats=False): #jobs_per_window = {1000.: 20, 172800.: 100, 2678400.: 100} files = glob(dir+'/gw_{:.1f}_mHz_seed_*_delta_t_{:.1e}.npz'.format(rate, delta_t)) - nside = 256 + nside = 512 npix = hp.nside2npix(nside) maps = sparse.csr_matrix((0, npix), dtype=float) @@ -52,7 +54,7 @@ def glob_allsky_scans(delta_t, rate, dir, low_stats=False): return maps -for rate in [6.0]:#, 6.2, 6.4, 6.6, 6.8, 7.0, 7.2]: +for rate in [6.0, 6.2, 6.4, 6.6, 6.8, 7.0, 7.2]: for low_stats in [False]:#[True, False]: print("Rate: {} mHz, low stats: {}".format(rate, low_stats)) maps = glob_allsky_scans(args.deltaT, rate, args.dir, low_stats=low_stats) diff --git a/fast_response/precomputed_background/precompute_ts.py b/fast_response/precomputed_background/precompute_ts.py index aaf9ee4..57e539a 100644 --- a/fast_response/precomputed_background/precompute_ts.py +++ b/fast_response/precomputed_background/precompute_ts.py @@ -9,10 +9,10 @@ import healpy as hp import os, sys, argparse from astropy.time import Time -#from numpy.lib.recfunctions import append_fields +from numpy.lib.recfunctions import append_fields from scipy import sparse +from glob import glob -from fast_response.GWFollowup import GWFollowup from fast_response.AlertFollowup import AlertFollowup parser = argparse.ArgumentParser(description='Fast Response Analysis') @@ -24,8 +24,6 @@ help="Expected background rate in mHz (defualt 6.4)") parser.add_argument('--seed', default=1, type=int, help='Unique seed for running on the cluster') -parser.add_argument('--type', default='gw', type=str, - help='type of follow up (gw or alert) to use') parser.add_argument('--outdir',type=str, default=None, help='Output directory to save npz (default = FAST_RESPONSE_OUTPUT env variable or cwd)') args = parser.parse_args() @@ -40,7 +38,8 @@ if not os.path.exists(outdir+'/trials/'): os.mkdir(outdir+'/trials/') outdir=outdir+'/trials/' -#skymap_files = glob('/data/ana/realtime/alert_catalog_v2/2yr_prelim/fits_files/Run13*.fits.gz') + +skymap_files = glob('/data/ana/realtime/alert_catalog_v2/2yr_prelim/fits_files/Run13*.fits.gz') start_mjd = 58484.0 #Jan 1, 2019 stop_mjd = start_mjd + (args.deltaT / 86400.) @@ -51,14 +50,9 @@ trials_per_sig = args.ntrials seed_counter = args.seed -#skymap required for either initialization, but not used here -if args.type=='gw': - f = GWFollowup('Precompute_trials_test', '/data/user/jthwaites/o3-gw-skymaps/S191216ap.fits.gz', - start, stop, save=False) -else: - f = AlertFollowup('Precompute_trials_test', - '/data/user/jthwaites/cascade_skymaps/hese_59546_run00135946.evt000006354173.fits', - start, stop, save=False) +#skymap required for initialization, but not used here +f = AlertFollowup('Precompute_trials_test', skymap_files[0], + start, stop, save=False) f.llh.nbackground=args.bkg*args.deltaT/1000. #inj = f.initialize_injector(gamma=2.5) #just put this here to initialize f.spatial_prior #print f.llh.nbackground @@ -80,9 +74,7 @@ maps[jj, pixels] = val['TS'] print("DONE") hp_sparse = maps.tocsr() -if args.type=='gw': - outfilename = outdir+'gw_{:.1f}_mHz_seed_{}_delta_t_{:.1e}.npz'.format(args.bkg, args.seed, args.deltaT) -else: - outfilename = outdir+'{:.1f}_mHz_seed_{}_delta_t_{:.1e}.npz'.format(args.bkg, args.seed, args.deltaT) + +outfilename = outdir+'{:.1f}_mHz_seed_{}_delta_t_{:.1e}.npz'.format(args.bkg, args.seed, args.deltaT) sparse.save_npz(outfilename, hp_sparse) print("Saved to {}".format(outfilename)) \ No newline at end of file diff --git a/fast_response/precomputed_background/precompute_ts_gw.py b/fast_response/precomputed_background/precompute_ts_gw.py new file mode 100644 index 0000000..fab4bbc --- /dev/null +++ b/fast_response/precomputed_background/precompute_ts_gw.py @@ -0,0 +1,148 @@ +r""" +Run trials for background only, all-sky scans. +No spatial prior is included; instead we record the +TS values in each pixel and maps are applied in realtime + +Modified from scripts by A. Pizzuto and R. Hussain +Jessie Thwaites, Oct 2023 +""" + + +import numpy as np +import healpy as hp +import argparse, os + +from astropy.time import Time +from fast_response import GWFollowup +from skylab.llh_models import EnergyLLH +from skylab.ps_llh import PointSourceLLH +from skylab.spectral_models import PowerLaw +from skylab.temporal_models import BoxProfile, TemporalModel +from scipy import sparse + +######################### CONFIGURE ARGUEMENTS ############################# +p = argparse.ArgumentParser(description="Calculates Sensitivity and Discovery" + " Potential Fluxes for Background Gravitational wave/Neutrino Coincidence study", + formatter_class=argparse.RawTextHelpFormatter) +p.add_argument("--ntrials", default=1000, type=int, + help="Number of trials (default=1000") +p.add_argument("--seed", default=0, type=int, + help="Process ID to save unique numpy array after running (Default=0)") +p.add_argument("--bkg", default=6.4, type=float, + help="Expected background rate in ontime window (default=6.4)") +p.add_argument("--deltaT", default = 1000, type=int, + help = "Time window to use, as an int! in sec (default 1000)") +p.add_argument("--outdir", default = None, type=str, + help = "output directory") +args = p.parse_args() + +def config_llh(self, scramble = True): + """ Use full timescramble method for BG trials + This method is adapted from initialize_llh in FastResponseAnalysis.py + """ + + self.get_data() + + print("Initializing Point Source LLH in Skylab") + + assert self._float_index==True,\ + 'Index should be fitted - check initialization' + + if self._fix_index: + llh_model = EnergyLLH( + twodim_bins=[self.energy_bins, self.sinDec_bins], + allow_empty=True, + spectrum=PowerLaw(A=1, gamma=self.index, E0=1000.), + ncpu=self._ncpu) + elif self._float_index: + llh_model = EnergyLLH( + twodim_bins=[self.energy_bins, self.sinDec_bins], + allow_empty=True, + bounds=self._index_range, + seed = self.index, + ncpu=self._ncpu) + + box = TemporalModel( + grl=self.grl, + poisson_llh=True, + days=self._nb_days, + signal=BoxProfile(self.start, self.stop)) + + # should not be an extension here + src_extension = None + + llh = PointSourceLLH( + self.exp, # array with data events + self.mc, # array with Monte Carlo events + self.livetime, # total livetime of the data events + ncpu=self._ncpu, # use 10 CPUs when computing trials + scramble=scramble, # set to False for unblinding + timescramble=True, # not just RA scrambling + llh_model=llh_model, # likelihood model + temporal_model=box, # use box for temporal model + nsource_bounds=(0., 1e3), # bounds on fitted ns + src_extension = src_extension, # Model symmetric extended source + nsource=1., # seed for nsignal fit + ) + + return llh + +if args.outdir == None: + try: + outdir = os.environ.get('FAST_RESPONSE_OUTPUT') + except: + outdir = './' +else: + outdir = args.outdir +if not os.path.exists(outdir+'/trials/'): + os.mkdir(outdir+'/trials/') +outdir=outdir+'/trials/' + +gw_time = Time(60000., format='mjd') #2023-02-25 +delta_t = float(args.deltaT) + +if args.deltaT==1000: + start_mjd = gw_time - (delta_t / 86400. / 2.) + stop_mjd = gw_time + (delta_t / 86400. / 2.) +else: #2 week followup + start_mjd = gw_time - 0.1 + stop_mjd = gw_time + 14. + +start_iso = start_mjd.iso +stop_iso = stop_mjd.iso + +#skymap needed for initialization - not used here +f = GWFollowup('precomputed_bg_test', '/data/user/jthwaites/o3-gw-skymaps/S191216ap.fits.gz', + start_iso, stop_iso) +f._allow_neg = False + +f.llh = config_llh(f) +f.llh.nbackground=args.bkg + +ntrials = args.ntrials +stop = ntrials * (args.seed+1) +start = stop-ntrials +seed_counter = start + +npix = hp.nside2npix(f.nside) +shape = (args.ntrials, npix) +maps = sparse.lil_matrix(shape, dtype=float) +for jj in range(ntrials): + val = f.llh.scan(0.0, 0.0, scramble=True, seed = seed_counter, + #spatial_prior = f.spatial_prior, + time_mask = [delta_t / 2., (start_mjd + stop_mjd) / 2.], + pixel_scan = [f.nside, f._pixel_scan_nsigma], inject = None) + + if val['TS'] is not None: + dtype = [('ts',float),('pixel',float)] + results = np.empty((val['TS'].size,), dtype=dtype) + pixels = hp.ang2pix(f.nside, np.pi/2. - val['dec'], val['ra']) + maps[jj, pixels] = val['TS'] + + seed_counter+=1 +print("DONE") +hp_sparse = maps.tocsr() + +outfilename = outdir+'gw_{:.1f}_mHz_delta_t_{:.1e}_seed_{}.npz'.format(args.bkg, args.deltaT, args.seed) +sparse.save_npz(outfilename, hp_sparse) +print("Saved to {}".format(outfilename)) \ No newline at end of file diff --git a/fast_response/precomputed_background/submit_glob.py b/fast_response/precomputed_background/submit_glob.py index ae5c305..51853f4 100644 --- a/fast_response/precomputed_background/submit_glob.py +++ b/fast_response/precomputed_background/submit_glob.py @@ -21,15 +21,15 @@ username = pwd.getpwuid(os.getuid())[0] if not os.path.exists(f'/scratch/{username}/'): os.mkdir(f'/scratch/{username}/') -if not os.path.exists(f'/scratch/{username}/gw/'): - os.mkdir(f'/scratch/{username}/gw/') -if not os.path.exists(f'/scratch/{username}/gw/condor/'): - os.mkdir(f'/scratch/{username}/gw/condor') +if not os.path.exists(f'/scratch/{username}/fra/'): + os.mkdir(f'/scratch/{username}/fra/') +if not os.path.exists(f'/scratch/{username}/fra/condor/'): + os.mkdir(f'/scratch/{username}/fra/condor') -error = f'/scratch/{username}/gw/condor/error' -output = f'/scratch/{username}/gw/condor/output' -log = f'/scratch/{username}/gw/condor/log' -submit = f'/scratch/{username}/gw/condor/submit' +error = f'/scratch/{username}/fra/condor/error' +output = f'/scratch/{username}/fra/condor/output' +log = f'/scratch/{username}/fra/condor/log' +submit = f'/scratch/{username}/fra/condor/submit' ### Create Dagman to submit jobs to cluster job = pycondor.Job( @@ -49,7 +49,7 @@ 'when_to_transfer_output = ON_EXIT'] ) -job.add_arg('--deltaT {} --dir {}'.format(args.tw, args.dir)) +job.add_arg('--deltaT {} --dir {} --nside {}'.format(args.tw, args.dir, 512)) job.build_submit() diff --git a/fast_response/precomputed_background/submit_precomputed_trials.py b/fast_response/precomputed_background/submit_precomputed_trials.py index 320257a..a3a3bd8 100644 --- a/fast_response/precomputed_background/submit_precomputed_trials.py +++ b/fast_response/precomputed_background/submit_precomputed_trials.py @@ -2,14 +2,17 @@ import numpy as np import argparse import pwd -import os +import os, sys from astropy.time import Time -import glob +import fast_response + +default_outdir = os.environ.get('FAST_RESPONSE_OUTPUT') +if default_outdir is None: default_outdir='./' parser = argparse.ArgumentParser( description='Submit script') parser.add_argument( - '--tw', type=float, default=1000., #[-0.1, +14]day: 1218240 + '--deltaT', type=float, default=1000., #[-0.1, +14]day: 1218240 help='time window to use (in sec)') parser.add_argument( '--ntrials', type=int, default=30000, @@ -20,25 +23,46 @@ parser.add_argument( '--n_per_batch', default=500, type=int, help='Number of trials to run in each set (default: 500)') +parser.add_argument( + '--type', default='alert', type=str, + help='run alert or GW trials. options are alert (default) or gw') +parser.add_argument( + '--outdir', default= default_outdir, type=str, + help = 'Output directory. default is FAST_RESPONSE_OUTPUT (if set) or ./ if not') args = parser.parse_args() +print('Output trials to directory: {}'.format(args.outdir)) + +fra_dir = os.path.dirname(fast_response.__file__) +if args.type=='alert': + script = os.path.join(fra_dir, 'precomputed_background/precompute_ts.py') + name = 'alert_precomp_trials' + deltaT = args.deltaT +elif args.type=='gw': + script = os.path.join(fra_dir, 'precomputed_background/precompute_ts_gw.py') + name = 'gw_precomp_trials' + deltaT = int(args.deltaT) +else: + print('--type must be alert or gw. Check options and try again') + sys.exit() + username = pwd.getpwuid(os.getuid())[0] if not os.path.exists(f'/scratch/{username}/'): os.mkdir(f'/scratch/{username}/') -if not os.path.exists(f'/scratch/{username}/gw/'): - os.mkdir(f'/scratch/{username}/gw/') -if not os.path.exists(f'/scratch/{username}/gw/condor/'): - os.mkdir(f'/scratch/{username}/gw/condor') +if not os.path.exists(f'/scratch/{username}/fra/'): + os.mkdir(f'/scratch/{username}/fra/') +if not os.path.exists(f'/scratch/{username}/fra/condor/'): + os.mkdir(f'/scratch/{username}/fra/condor') -error = f'/scratch/{username}/gw/condor/error' -output = f'/scratch/{username}/gw/condor/output' -log = f'/scratch/{username}/gw/condor/log' -submit = f'/scratch/{username}/gw/condor/submit' +error = f'/scratch/{username}/fra/condor/error' +output = f'/scratch/{username}/fra/condor/output' +log = f'/scratch/{username}/fra/condor/log' +submit = f'/scratch/{username}/fra/condor/submit' -### Create Dagman to submit jobs to cluster +### Create Dagman to submit jobs to cluster job = pycondor.Job( - 'gw_precomp_trials', - '/data/user/jthwaites/FastResponseAnalysis/fast_response/precomputed_background/precompute_ts.py', + name, + script, error=error, output=output, log=log, @@ -53,19 +77,16 @@ 'when_to_transfer_output = ON_EXIT'] ) -#prev_trials = glob.glob('/data/user/jthwaites/FastResponseAnalysis/output/trials/*.npz') for bg_rate in [6.0, 6.2, 6.4, 6.6, 6.8, 7.0, 7.2]: - for seed in range(args.seed_start, int(args.ntrials/args.n_per_batch)): - seed = seed*100 - job.add_arg('--deltaT %s --ntrials %i --seed %i --bkg %s --outdir %s --type gw' - %(args.tw, args.n_per_batch, seed, bg_rate, - '/data/user/jthwaites/new_processing/normal_1000/')) + for seed in range(args.seed_start, int(args.ntrials/args.n_per_batch)+1): + job.add_arg('--deltaT {} --ntrials {} --seed {} --bkg {} --outdir {}'.format( + deltaT, args.n_per_batch, seed, bg_rate, args.outdir)) -#dagman = pycondor.Dagman( -# 'fra_1000s_precompbg', -# submit=submit, verbose=2) +dagman = pycondor.Dagman( + 'fra_precompbg', + submit=submit, verbose=2) -#dagman.add_job(job) -#dagman.build_submit() -job.build_submit() +dagman.add_job(job) +dagman.build_submit() +#job.build_submit() diff --git a/fast_response/scripts/combine_results_classic.py b/fast_response/scripts/combine_results_classic.py new file mode 100644 index 0000000..a7df165 --- /dev/null +++ b/fast_response/scripts/combine_results_classic.py @@ -0,0 +1,591 @@ +#!/usr/bin/env python + +'''Note: this version listens to GCN CLASSIC. +Main sender is combine_results_kafka.py''' + +import logging +from datetime import datetime +import socket +import requests +import healpy as hp +import matplotlib as mpl +mpl.use('agg') +import matplotlib.pyplot as plt +import io, time, os, glob, subprocess +import urllib.request, urllib.error, urllib.parse +import argparse +import json, pickle +from gcn_kafka import Consumer +import gcn +#from icecube import realtime_tools +import numpy as np +import lxml.etree +from astropy.time import Time +import dateutil.parser +from datetime import datetime +from fast_response.web_utils import updateGW_public + +parser = argparse.ArgumentParser(description='Combine GW-Nu results') +parser.add_argument('--run_live', action='store_true', default=False, + help='Run on live GCNs') +parser.add_argument('--test_path', type=str, default=None, + help='path to test xml file') +parser.add_argument('--max_wait', type=float, default=60., + help='Maximum minutes to wait for LLAMA/UML results before timeout (default=60)') +parser.add_argument('--wait_for_llama', action='store_true', default=False, + help='bool to decide to send llama results with uml, default false while not unblinded') +parser.add_argument('--heartbeat', action='store_true', default=False, + help='bool to save jsons for mocks (saves to save_dir+mocks/)') +parser.add_argument('--save_dir', type=str, default='/home/followup/lvk_followup_output/', + help='Directory to save output json (default=/home/followup/lvk_followup_output/)') +args = parser.parse_args() + +with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/kafka_token.txt') as f: + client_id = f.readline().rstrip('\n') + client_secret = f.readline().rstrip('\n') + +consumer = Consumer(client_id=client_id, + client_secret=client_secret) + +# Subscribe to topics to receive alerts +consumer.subscribe(['gcn.classic.voevent.LVC_PRELIMINARY', + 'gcn.classic.voevent.LVC_INITIAL', + 'gcn.classic.voevent.LVC_UPDATE']) +#consumer.subscribe(['igwn.gwalert']) + +def SendAlert(results=None): + from gcn_kafka import Producer + + if results is None: + logger.fatal('Found no alert to send') + + with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/real_icecube_kafka_prod.txt') as f: + prod_id = f.readline().rstrip('\n') + prod_secret = f.readline().rstrip('\n') + + producer = Producer(#config={'bootstrap.servers': 'kafka3.gcn.nasa.gov'}, + client_id=prod_id, + client_secret=prod_secret, + domain='gcn.nasa.gov') + + if 'MS' in results['ref_id']: + return + topic = 'gcn.notices.icecube.test.lvk_nu_track_search' + else: + topic = 'gcn.notices.icecube.lvk_nu_track_search' + + logger.info('sending to {} on gcn.nasa.gov'.format(topic)) + sendme = json.dumps(results) + producer.produce(topic, sendme.encode()) + ret = producer.flush() + return ret + +def SendTestAlert(results=None): + from gcn_kafka import Producer + + if results is None: + logger.fatal('Found no alert to send') + + with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/test_icecube_kafka_prod.txt') as f: + prod_id = f.readline().rstrip('\n') + prod_secret = f.readline().rstrip('\n') + + producer = Producer(#config={'bootstrap.servers': 'kafka3.gcn.nasa.gov'}, + client_id=prod_id, + client_secret=prod_secret, + domain='test.gcn.nasa.gov') + + topic = 'gcn.notices.icecube.test.lvk_nu_track_search' + logger.info('sending to {} on test.gcn.nasa.gov'.format(topic)) + + sendme = json.dumps(results) + producer.produce(topic, sendme.encode()) + ret = producer.flush() + return ret + +def format_ontime_events_uml(events, event_mjd): + ontime_events={} + for event in events: + ontime_events[event['event']]={ + 'event_dt' : round((event['time']-event_mjd)*86400., 2), + 'localization':{ + 'ra' : round(np.rad2deg(event['ra']), 2), + 'dec' : round(np.rad2deg(event['dec']), 2), + "uncertainty_shape": "circle", + 'ra_uncertainty': round(np.rad2deg(event['sigma']*2.145966),2), + "containment_probability": 0.9, + "systematic_included": False + }, + 'event_pval_generic' : round(event['pvalue'],4) + } + return ontime_events + +def format_ontime_events_llama(events): + ontime_events={} + for event in events: + ontime_events[event['i3event']] = { + 'event_dt' : round(event['dt'],2), + 'localization':{ + 'ra' : round(event['ra'], 2), + 'dec' : round(event['dec'], 2), + "uncertainty_shape": "circle", + 'ra_uncertainty': round(np.rad2deg(np.deg2rad(event['sigma'])*2.145966),3), + "containment_probability": 0.9, + "systematic_included": False + }, + 'event_pval_bayesian': round(event['p_value'],4) + } + return ontime_events + +def format_ontime_events_llama_old(events,event_mjd): + ontime_events={} + for event in events: + ontime_events[event['event']] = { + 'event_dt' : round((event['mjd']-event_mjd)*86400.,2), + 'localization':{ + 'ra' : round(np.rad2deg(event['ra']), 2), + 'dec' : round(np.rad2deg(event['dec']), 2), + "uncertainty_shape": "circle", + 'ra_uncertainty': round(np.rad2deg(event['sigma']*2.145966),3), + "containment_probability": 0.9, + "systematic_included": False + }, + 'event_pval_bayesian': 'null' + } + return ontime_events + +def combine_events(uml_ontime, llama_ontime): + #in the case both return coincident events, want to combine their results + event_ids_all = np.unique(list(uml_ontime.keys())+list(llama_ontime.keys())) + coinc_events = [] + + for id in event_ids_all: + #case: event in both results + if (id in uml_ontime.keys()) and (id in llama_ontime.keys()): + if (uml_ontime[id]['event_pval_generic'] < 0.1) and (llama_ontime[id]['event_pval_bayesian']<0.1): + uml_ontime[id]['event_pval_bayesian'] = llama_ontime[id]['event_pval_bayesian'] + coinc_events.append(uml_ontime[id]) + elif uml_ontime[id]['event_pval_generic'] < 0.1: + uml_ontime[id]['event_pval_bayesian'] = 'null' + coinc_events.append(uml_ontime[id]) + elif llama_ontime[id]['event_pval_bayesian']<0.1: + uml_ontime[id]['event_pval_bayesian'] = llama_ontime[id]['event_pval_bayesian'] + uml_ontime[id]['event_pval_generic'] ='null' + coinc_events.append(uml_ontime[id]) + #case: only in uml + elif id in uml_ontime.keys(): + if uml_ontime[id]['event_pval_generic'] < 0.1: + uml_ontime[id]['event_pval_bayesian'] = 'null' + coinc_events.append(uml_ontime[id]) + #case: only in llama + elif id in llama_ontime.keys(): + if llama_ontime[id]['event_pval_bayesian']<0.1: + llama_ontime[id]['event_pval_generic']='null' + coinc_events.append(llama_ontime[id]) + + return coinc_events + +@gcn.handlers.include_notice_types( + gcn.notice_types.LVC_PRELIMINARY, + gcn.notice_types.LVC_INITIAL, + gcn.notice_types.LVC_UPDATE) + +def parse_notice(payload, record): + logger = logging.getLogger() + wait_for_llama=args.wait_for_llama + heartbeat = args.heartbeat + + if record.attrib['role']!='observation': + fra_results_location = '/data/user/jthwaites/o4-mocks/' + if not heartbeat: + logger.warning('found test event - not in mock mode. returning') + return + else: + logger.warning('found test event') + else: + logger.warning('ALERT FOUND') + fra_results_location = os.environ.get('FAST_RESPONSE_OUTPUT') + + ### GENERAL PARAMETERS ### + #read event information + params = {elem.attrib['name']: + elem.attrib['value'] + for elem in record.iterfind('.//Param')} + + # ignore subthreshold, only run on significant events + subthreshold=False + if 'Significant' in params.keys(): + if int(params['Significant'])==0: + subthreshold=True + logger.warning('low-significance alert found. ') + if params['Group'] == 'Burst': + wait_for_llama = False + m = 'Significant' if not subthreshold else 'Subthreshold' + logger.warning('{} burst alert found. '.format(m)) + if len(params['Instruments'].split(','))==1: + #wait_for_llama = False + logger.warning('One detector event found. ') + + if not wait_for_llama and subthreshold: + #if llama isn't running, don't run on subthreshold + logger.warning('Not waiting for LLAMA. Returning...') + return + + collected_results = {} + collected_results["$schema"]= "https://gcn.nasa.gov/schema/gcn/notices/icecube/LvkNuTrackSearch.schema.json" + collected_results["type"]= "IceCube LVK Alert Nu Track Search" + + eventtime = record.find('.//ISOTime').text + event_mjd = Time(eventtime, format='isot').mjd + name = record.attrib['ivorn'].split('#')[1] + + if record.attrib['role'] != 'observation': + name=name+'_test' + + logger.info("{} alert found, processing GCN".format(name)) + collected_results['ref_id'] = name + + #collected_results['ref_id'] = name.split('-')[0] + #collected_results['id'] = [name.split('-')[2],'Sequence:{}'.format(name.split('-')[1])] + + ### WAIT ### + # wait until the 500s has elapsed for data + current_mjd = Time(datetime.utcnow(), scale='utc').mjd + needed_delay = 500./84600. + #needed_delay = 720./86400. #12 mins while lvk_dropbox is offline + current_delay = current_mjd - event_mjd + + while current_delay < needed_delay: + logger.info("Wait {:.1f} seconds for data".format( + (needed_delay - current_delay)*86400.) + ) + time.sleep((needed_delay - current_delay)*86400.) + current_mjd = Time(datetime.utcnow(), scale='utc').mjd + current_delay = current_mjd - event_mjd + + # start checking for results + results_done = False + start_check_mjd = Time(datetime.utcnow(), scale='utc').mjd + max_delay = max_wait/60./24. #mins to days + + logger.info("Looking for results (max {:.0f} mins)".format( + max_wait) + ) + + while results_done == False: + start_date = Time(dateutil.parser.parse(eventtime)).datetime + start_str = f'{start_date.year:02d}_{start_date.month:02d}_{start_date.day:02d}' + + uml_results_path = os.path.join(fra_results_location, start_str + '_' + name.replace(' ', '_') \ + + '/' + start_str + '_' + name.replace(' ', '_')+'_results.pickle') + uml_results_finished = os.path.exists(uml_results_path) + + if len(params['Instruments'].split(','))==1: + llama_name = '{}.significance_opa_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) + else: + llama_name = '{}.significance_subthreshold_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) + llama_results_path = os.path.join(llama_results_location,llama_name) + if wait_for_llama: + llama_results_finished = os.path.exists(llama_results_path) + else: + llama_results_finished = False + #llama_name = '{}*-significance_subthreshold_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) + #llama_results_path = os.path.join(llama_results_location,llama_name) + #if wait_for_llama: + # llama_results_glob = sorted(glob.glob(llama_results_path)) + # if len(llama_results_glob)>0: + # llama_results_path = llama_results_glob[-1] + # llama_results_finished=True + # else: + # llama_results_finished=False + #else: + # llama_results_finished = False + + if subthreshold and llama_results_finished: + #no UML results in subthreshold case, only LLAMA + results_done = True + uml_results_finished = False + logger.info('found results for LLAMA for subthreshold event, writing notice') + + if uml_results_finished and llama_results_finished: + results_done=True + logger.info('found results for both, writing notice') + elif uml_results_finished and not wait_for_llama: + results_done = True + logger.info('found results for UML, writing notice') + else: + current_mjd = Time(datetime.utcnow(), scale='utc').mjd + if current_mjd - start_check_mjd < max_delay: + time.sleep(10.) + else: + if (uml_results_finished or llama_results_finished): + if uml_results_finished: + logger.warning('LLAMA results not finished in {:.0f} mins. Sending UML only'.format(max_wait)) + results_done=True + if llama_results_finished: + logger.warning('UML results not finished in {:.0f} mins. Sending LLAMA only'.format(max_wait)) + results_done=True + else: + logger.warning('Both analyses not finished after {:.0f} min wait.'.format(max_wait)) + logger.warning('Not sending GCN.') + return + + collected_results['alert_datetime'] = '{}Z'.format(Time(datetime.utcnow(), scale='utc').isot) + collected_results['trigger_time'] = eventtime if 'Z' in eventtime else '{}Z'.format(eventtime) + collected_results['observation_start'] = '{}Z'.format(Time(event_mjd - 500./86400., format = 'mjd').isot) + collected_results['observation_stop'] = '{}Z'.format(Time(event_mjd + 500./86400., format = 'mjd').isot) + collected_results['observation_livetime'] = 1000 + + ### COLLECT RESULTS ### + send_notif = False + + if uml_results_finished: + with open(uml_results_path, 'rb') as f: + uml_results = pickle.load(f) + + if llama_results_finished: + with open(llama_results_path, 'r') as f: + llama_results = json.load(f) + if (record.attrib['role'] == 'observation') and (llama_results['inputs']['neutrino_info'][0]['type']=='blinded'): + logger.warning('LLAMA results blinded for real event! Skipping LLAMA') + llama_results_finished = False + if subthreshold: + logger.warning('LLAMA skipped on subthreshold event, returning') + return + + if uml_results_finished and llama_results_finished: + collected_results['pval_generic'] = round(uml_results['p'],4) + collected_results['pval_bayesian'] = round(llama_results['p_value'],4) + if (collected_results['pval_generic']<0.01) or (collected_results['pval_bayesian']<0.01): + send_notif=True + + if (collected_results['pval_generic']<0.1) or (collected_results['pval_bayesian']<0.1): + uml_ontime = format_ontime_events_uml(uml_results['coincident_events'], event_mjd) + llama_ontime = format_ontime_events_llama(llama_results['single_neutrino']) + coinc_events = combine_events(uml_ontime, llama_ontime) + collected_results['n_events_coincident'] = len(coinc_events) + collected_results['coincident_events'] = coinc_events + + collected_results['most_likely_direction'] = { + 'ra': round(np.rad2deg(uml_results['fit_ra']), 2), + 'dec' : round(np.rad2deg(uml_results['fit_dec']), 2), + } + else: + collected_results['n_events_coincident'] = 0 + + collected_results["neutrino_flux_sensitivity_range"] = { + 'flux_sensitivity' : [ + round(uml_results['sens_range'][0],4), + round(uml_results['sens_range'][1],4) + ], + 'sensitive_energy_range' : [ + round(float('{:.2e}'.format(uml_results['energy_range'][0]))), + round(float('{:.2e}'.format(uml_results['energy_range'][1]))) + ], + } + + elif uml_results_finished: + collected_results['pval_generic'] = round(uml_results['p'],4) + collected_results['pval_bayesian'] = 'null' + if collected_results['pval_generic']<0.01: + send_notif=True + + if collected_results['pval_generic'] <0.1: + uml_ontime = format_ontime_events_uml(uml_results['coincident_events'], event_mjd) + coinc_events=[] + for eventid in uml_ontime.keys(): + if (uml_ontime[eventid]['event_pval_generic'] < 0.1): + uml_ontime[eventid]['event_pval_bayesian'] = 'null' + coinc_events.append(uml_ontime[eventid]) + collected_results['n_events_coincident'] = len(coinc_events) + collected_results['coincident_events'] = coinc_events + + collected_results['most_likely_direction'] = { + 'ra': round(np.rad2deg(uml_results['fit_ra']), 2), + 'dec' : round(np.rad2deg(uml_results['fit_dec']), 2) + } + + else: + collected_results['n_events_coincident'] = 0 + + collected_results["neutrino_flux_sensitivity_range"] = { + 'flux_sensitivity' : [ + round(uml_results['sens_range'][0],4), + round(uml_results['sens_range'][1],4) + ], + 'sensitive_energy_range' :[ + round(float('{:.2e}'.format(uml_results['energy_range'][0]))), + round(float('{:.2e}'.format(uml_results['energy_range'][1]))) + ], + } + + elif llama_results_finished: + collected_results['pval_generic'] = 'null' + collected_results['pval_bayesian'] = round(llama_results['p_value'],4) + if collected_results['pval_bayesian']<0.01: + send_notif=True + + if collected_results['pval_bayesian']<0.1: + llama_ontime = format_ontime_events_llama(llama_results['single_neutrino']) + coinc_events=[] + for eventid in llama_ontime.keys(): + if (llama_ontime[eventid]['event_pval_bayesian'] < 0.1): + llama_ontime[eventid]['event_pval_generic'] = 'null' + coinc_events.append(llama_ontime[eventid]) + collected_results['n_events_coincident'] = len(coinc_events) + collected_results['coincident_events'] = coinc_events + else: + collected_results['n_events_coincident'] = 0 + + collected_results["neutrino_flux_sensitivity_range"] = { + 'flux_sensitivity' : [ + round(llama_results['neutrino_flux_sensitivity_range']['flux_sensitivity'][0],4), + round(llama_results['neutrino_flux_sensitivity_range']['flux_sensitivity'][1],4) + ], + 'sensitive_energy_range' :[ + round(float('{:.2e}'.format(llama_results['neutrino_flux_sensitivity_range'] + ['sensitive_energy_range'][0]))), + round(float('{:.2e}'.format(llama_results['neutrino_flux_sensitivity_range'] + ['sensitive_energy_range'][1]))) + ], + } + + if (collected_results['n_events_coincident'] == 0) and ('coincident_events' in collected_results.keys()): + c = collected_results.pop('coincident_events') + if ('most_likely_direction' in collected_results.keys()): + if (collected_results['n_events_coincident'] == 0): + c = collected_results.pop('most_likely_direction') + if ('most_likely_direction' in collected_results.keys()): + try: + if (collected_results['pval_generic']>0.1): + c = collected_results.pop('most_likely_direction') + except Exception as e: + print(e) + + ### SAVE RESULTS ### + if record.attrib['role']=='observation' and not heartbeat: + with open(os.path.join(save_location, f'{name}_collected_results.json'),'w') as f: + json.dump(collected_results, f, indent = 6) + + logger.info('sending notice') + status = SendAlert(results = collected_results) + st = 'sent' if status == 0 else 'error!' + logger.info('status: {}'.format(status)) + logger.info('{}'.format(st)) + + if status ==0: + with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/gw_token.txt') as f: + my_key = f.readline() + + if not subthreshold: + channels = ['#gwnu-heartbeat', '#gwnu','#alerts'] + else: + channels = ['#gwnu-heartbeat'] + for channel in channels: + with open(os.path.join(save_location, f'{name}_collected_results.json'),'r') as fi: + response = requests.post('https://slack.com/api/files.upload', + timeout=60, + params={'token': my_key}, + data={'filename':'gcn.json', + 'title': f'GCN Notice for {name}', + 'channels': channel}, + files={'file': fi} + ) + if response.ok is True: + logger.info("GCN posted OK to {}".format(channel)) + else: + logger.info("Error posting skymap to {}!".format(channel)) + + collected_results['subthreshold'] = subthreshold + try: + updateGW_public(collected_results) + logger.info('Updated webpage.') + except: + logger.warning('Failed to push to public webpage.') + + if send_notif: + sender_script = os.path.join(os.environ.get('FAST_RESPONSE_SCRIPTS'), + '../slack_posters/lvk_email_sms_notif.py') + try: + subprocess.call([sender_script, '--path_to_gcn', + os.path.join(save_location, f'{name}_collected_results.json')]) + logger.info('Sent alert to ROC for p<0.01') + except: + logger.warning('Failed to send email/SMS notification.') + else: + logger.info('p>0.01: no email/sms sent') + else: + with open(os.path.join(save_location, f'mocks/{name}_collected_results.json'),'w') as f: + json.dump(collected_results, f, indent = 6) + #logger.info('sending test notice') + #status = SendTestAlert(results = collected_results) + #logger.info('status: {}'.format(status)) + + #send the notice to slack (#gw-mock-heartbeat) + with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/gw_token.txt') as f: + my_key = f.readline() + + channel = '#gw-mock-heartbeat' + with open(os.path.join(save_location, f'mocks/{name}_collected_results.json'),'r') as fi: + response = requests.post('https://slack.com/api/files.upload', + timeout=60, + params={'token': my_key}, + data={'filename':'gcn.json', + 'title': f'GCN Notice for {name}', + 'channels': channel}, + files={'file': fi} + ) + if response.ok is True: + logger.info("GCN posted OK to {}".format(channel)) + else: + logger.info("Error posting skymap to {}!".format(channel)) + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.warning("combine_results starting, connecting to GCN") + +#fra_results_location = os.environ.get('FAST_RESPONSE_OUTPUT')#'/data/user/jthwaites/o4-mocks/' +llama_results_location = '/home/followup/lvk_dropbox/' +#llama_results_location = '/home/azhang/public_html/llama/json/' +#save_location = '/home/followup/lvk_followup_output/' #where to save final json +save_location = args.save_dir + +max_wait = args.max_wait +#wait_for_llama = args.wait_for_llama + +if args.run_live: + logger.info('running on live GCNs') + ''' + while True: + for message in consumer.consume(timeout=1): + value = message.value() + if '<' not in value.decode('utf-8')[0]: + #sometimes, we'll get error messages - these make the code fail. skip them + logger.warning(value.decode('utf-8')) + continue + notice = lxml.etree.fromstring(value.decode('utf-8').encode('ascii')) + parse_notice(notice) + logger.info('Done.') + ''' + gcn.listen(handler=parse_notice) + +else: + if args.test_path is None: + paths=glob.glob('/home/jthwaites/FastResponse/*/*xml') + path = paths[1] + #path = '/home/jthwaites/FastResponse/S230522n-preliminary.json,1' + else: + path = args.test_path + + logger.info('running on {}'.format(path)) + + with open(path, 'r') as f: + payload = f.read() + try: + record = lxml.etree.fromstring(payload) + except Exception as e: + print(e) + exit() + + parse_notice(payload, record) +logger.info("done") \ No newline at end of file diff --git a/fast_response/scripts/combine_results_kafka.py b/fast_response/scripts/combine_results_kafka.py index ccf2ff4..5001ef2 100644 --- a/fast_response/scripts/combine_results_kafka.py +++ b/fast_response/scripts/combine_results_kafka.py @@ -21,17 +21,19 @@ from datetime import datetime from fast_response.web_utils import updateGW_public -with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/kafka_token.txt') as f: +with open('/home/jthwaites/private/tokens/kafka_token.txt') as f: client_id = f.readline().rstrip('\n') client_secret = f.readline().rstrip('\n') consumer = Consumer(client_id=client_id, - client_secret=client_secret) + client_secret=client_secret, + config={'max.poll.interval.ms':1200000}) # Subscribe to topics to receive alerts consumer.subscribe(['gcn.classic.voevent.LVC_PRELIMINARY', 'gcn.classic.voevent.LVC_INITIAL', 'gcn.classic.voevent.LVC_UPDATE']) +#consumer.subscribe(['igwn.gwalert']) def SendAlert(results=None): from gcn_kafka import Producer @@ -39,15 +41,16 @@ def SendAlert(results=None): if results is None: logger.fatal('Found no alert to send') - with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/real_icecube_kafka_prod.txt') as f: + with open('/home/jthwaites/private/tokens/real_icecube_kafka_prod.txt') as f: prod_id = f.readline().rstrip('\n') prod_secret = f.readline().rstrip('\n') - producer = Producer(client_id=prod_id, + producer = Producer(#config={'bootstrap.servers': 'kafka3.gcn.nasa.gov'}, + client_id=prod_id, client_secret=prod_secret, domain='gcn.nasa.gov') - if 'MS' in results['ref_id']: + if 'MS' in results['ref_ID']: return topic = 'gcn.notices.icecube.test.lvk_nu_track_search' else: @@ -65,11 +68,12 @@ def SendTestAlert(results=None): if results is None: logger.fatal('Found no alert to send') - with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/test_icecube_kafka_prod.txt') as f: + with open('/home/jthwaites/private/tokens/test_icecube_kafka_prod.txt') as f: prod_id = f.readline().rstrip('\n') prod_secret = f.readline().rstrip('\n') - producer = Producer(client_id=prod_id, + producer = Producer(#config={'bootstrap.servers': 'kafka3.gcn.nasa.gov'}, + client_id=prod_id, client_secret=prod_secret, domain='test.gcn.nasa.gov') @@ -90,7 +94,7 @@ def format_ontime_events_uml(events, event_mjd): 'ra' : round(np.rad2deg(event['ra']), 2), 'dec' : round(np.rad2deg(event['dec']), 2), "uncertainty_shape": "circle", - 'ra_uncertainty': round(np.rad2deg(event['sigma']*2.145966),2), + 'ra_uncertainty': [round(np.rad2deg(event['sigma']*2.145966),2)], "containment_probability": 0.9, "systematic_included": False }, @@ -107,7 +111,7 @@ def format_ontime_events_llama(events): 'ra' : round(event['ra'], 2), 'dec' : round(event['dec'], 2), "uncertainty_shape": "circle", - 'ra_uncertainty': round(np.rad2deg(np.deg2rad(event['sigma'])*2.145966),3), + 'ra_uncertainty': [round(np.rad2deg(np.deg2rad(event['sigma'])*2.145966),3)], "containment_probability": 0.9, "systematic_included": False }, @@ -124,11 +128,11 @@ def format_ontime_events_llama_old(events,event_mjd): 'ra' : round(np.rad2deg(event['ra']), 2), 'dec' : round(np.rad2deg(event['dec']), 2), "uncertainty_shape": "circle", - 'ra_uncertainty': round(np.rad2deg(event['sigma']*2.145966),3), + 'ra_uncertainty': [round(np.rad2deg(event['sigma']*2.145966),3)], "containment_probability": 0.9, "systematic_included": False }, - 'event_pval_bayesian': 'null' + 'event_pval_bayesian': None } return ontime_events @@ -144,21 +148,21 @@ def combine_events(uml_ontime, llama_ontime): uml_ontime[id]['event_pval_bayesian'] = llama_ontime[id]['event_pval_bayesian'] coinc_events.append(uml_ontime[id]) elif uml_ontime[id]['event_pval_generic'] < 0.1: - uml_ontime[id]['event_pval_bayesian'] = 'null' + uml_ontime[id]['event_pval_bayesian'] = None coinc_events.append(uml_ontime[id]) elif llama_ontime[id]['event_pval_bayesian']<0.1: uml_ontime[id]['event_pval_bayesian'] = llama_ontime[id]['event_pval_bayesian'] - uml_ontime[id]['event_pval_generic'] ='null' + uml_ontime[id]['event_pval_generic'] = None coinc_events.append(uml_ontime[id]) #case: only in uml elif id in uml_ontime.keys(): if uml_ontime[id]['event_pval_generic'] < 0.1: - uml_ontime[id]['event_pval_bayesian'] = 'null' + uml_ontime[id]['event_pval_bayesian'] = None coinc_events.append(uml_ontime[id]) #case: only in llama elif id in llama_ontime.keys(): if llama_ontime[id]['event_pval_bayesian']<0.1: - llama_ontime[id]['event_pval_generic']='null' + llama_ontime[id]['event_pval_generic']= None coinc_events.append(llama_ontime[id]) return coinc_events @@ -188,20 +192,22 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): if 'Significant' in params.keys(): if int(params['Significant'])==0: subthreshold=True + logger.warning('low-significance alert found. ') if params['Group'] == 'Burst': wait_for_llama = False m = 'Significant' if not subthreshold else 'Subthreshold' logger.warning('{} burst alert found. '.format(m)) + if len(params['Instruments'].split(','))==1: + #wait_for_llama = False + logger.warning('One detector event found. ') if not wait_for_llama and subthreshold: #if llama isn't running, don't run on subthreshold - logger.warning('Not waiting for LLAMA, subthreshold alert. Returning...') + logger.warning('Not waiting for LLAMA. Returning...') return - logger.info("alert found, processing GCN") - collected_results = {} - collected_results["$schema"]= "https://gcn.nasa.gov/schema/gcn/notices/icecube/LvkNuTrackSearch.schema.json" + collected_results["$schema"]= "https://gcn.nasa.gov/schema/stable/gcn/notices/icecube/LvkNuTrackSearch.schema.json" collected_results["type"]= "IceCube LVK Alert Nu Track Search" eventtime = record.find('.//ISOTime').text @@ -211,10 +217,9 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): if record.attrib['role'] != 'observation': name=name+'_test' - collected_results['ref_id'] = name - - #collected_results['ref_id'] = name.split('-')[0] - #collected_results['id'] = [name.split('-')[2],'Sequence:{}'.format(name.split('-')[1])] + logger.info("{} alert found, processing GCN".format(name)) + collected_results['ref_ID'] = name.split('-')[0] + collected_results['reference']= {"gcn.notices.LVK.alert": name} ### WAIT ### # wait until the 500s has elapsed for data @@ -236,10 +241,11 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): start_check_mjd = Time(datetime.utcnow(), scale='utc').mjd max_delay = max_wait/60./24. #mins to days - while results_done == False: - logger.info("Waiting for results (max {:.0f} mins)".format( + logger.info("Looking for results (max {:.0f} mins)".format( max_wait) ) + + while results_done == False: start_date = Time(dateutil.parser.parse(eventtime)).datetime start_str = f'{start_date.year:02d}_{start_date.month:02d}_{start_date.day:02d}' @@ -247,7 +253,10 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): + '/' + start_str + '_' + name.replace(' ', '_')+'_results.pickle') uml_results_finished = os.path.exists(uml_results_path) - llama_name = '{}.significance_subthreshold_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) + if len(params['Instruments'].split(','))==1: + llama_name = '{}.significance_opa_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) + else: + llama_name = '{}.significance_subthreshold_lvc-i3.json'.format(record.attrib['ivorn'].split('#')[1]) llama_results_path = os.path.join(llama_results_location,llama_name) if wait_for_llama: llama_results_finished = os.path.exists(llama_results_path) @@ -286,12 +295,30 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): if uml_results_finished: logger.warning('LLAMA results not finished in {:.0f} mins. Sending UML only'.format(max_wait)) results_done=True + if record.attrib['role']=='observation' and not heartbeat: + try: + subprocess.call(['/home/jthwaites/private/make_call.py', + '--troubleshoot_gcn=True', '--missing_llama=True']) + except: + logger.warning('Failed to send alert to shifters: Issue finding LLAMA results. ') if llama_results_finished: logger.warning('UML results not finished in {:.0f} mins. Sending LLAMA only'.format(max_wait)) results_done=True + if record.attrib['role']=='observation' and not heartbeat: + try: + subprocess.call(['/home/jthwaites/private/make_call.py', + '--troubleshoot_gcn=True', '--missing_uml=True']) + except: + logger.warning('Failed to send alert to shifters: Issue finding UML results. ') else: logger.warning('Both analyses not finished after {:.0f} min wait.'.format(max_wait)) logger.warning('Not sending GCN.') + if record.attrib['role']=='observation' and not heartbeat: + try: + subprocess.call(['/home/jthwaites/private/make_call.py', + '--troubleshoot_gcn=True', '--missing_llama=True', '--missing_uml=True']) + except: + logger.warning('Failed to send alert to shifters: Issue finding both results. ') return collected_results['alert_datetime'] = '{}Z'.format(Time(datetime.utcnow(), scale='utc').isot) @@ -310,12 +337,15 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): if llama_results_finished: with open(llama_results_path, 'r') as f: llama_results = json.load(f) - if (record.attrib['role'] == 'observation') and (llama_results['inputs']['neutrino_info'][0]['type']=='blinded'): - logger.warning('LLAMA results blinded for real event! Skipping LLAMA') - llama_results_finished = False - if subthreshold: - logger.warning('LLAMA skipped on subthreshold event, returning') - return + try: + if (record.attrib['role'] == 'observation') and (llama_results['inputs']['neutrino_info'][0]['type']=='blinded'): + logger.warning('LLAMA results blinded for real event! Skipping LLAMA') + llama_results_finished = False + if subthreshold: + logger.warning('LLAMA skipped on subthreshold event, returning') + return + except: + logger.warning('NO neutrinos in 1000s in LLAMA result! Still sending . . .') if uml_results_finished and llama_results_finished: collected_results['pval_generic'] = round(uml_results['p'],4) @@ -330,7 +360,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): collected_results['n_events_coincident'] = len(coinc_events) collected_results['coincident_events'] = coinc_events - collected_results['most_likely_direction'] = { + collected_results['most_probable_direction'] = { 'ra': round(np.rad2deg(uml_results['fit_ra']), 2), 'dec' : round(np.rad2deg(uml_results['fit_dec']), 2), } @@ -350,7 +380,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): elif uml_results_finished: collected_results['pval_generic'] = round(uml_results['p'],4) - collected_results['pval_bayesian'] = 'null' + collected_results['pval_bayesian'] = None if collected_results['pval_generic']<0.01: send_notif=True @@ -359,12 +389,12 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): coinc_events=[] for eventid in uml_ontime.keys(): if (uml_ontime[eventid]['event_pval_generic'] < 0.1): - uml_ontime[eventid]['event_pval_bayesian'] = 'null' + uml_ontime[eventid]['event_pval_bayesian'] = None coinc_events.append(uml_ontime[eventid]) collected_results['n_events_coincident'] = len(coinc_events) collected_results['coincident_events'] = coinc_events - collected_results['most_likely_direction'] = { + collected_results['most_probable_direction'] = { 'ra': round(np.rad2deg(uml_results['fit_ra']), 2), 'dec' : round(np.rad2deg(uml_results['fit_dec']), 2) } @@ -384,7 +414,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): } elif llama_results_finished: - collected_results['pval_generic'] = 'null' + collected_results['pval_generic'] = None collected_results['pval_bayesian'] = round(llama_results['p_value'],4) if collected_results['pval_bayesian']<0.01: send_notif=True @@ -394,7 +424,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): coinc_events=[] for eventid in llama_ontime.keys(): if (llama_ontime[eventid]['event_pval_bayesian'] < 0.1): - llama_ontime[eventid]['event_pval_generic'] = 'null' + llama_ontime[eventid]['event_pval_generic'] = None coinc_events.append(llama_ontime[eventid]) collected_results['n_events_coincident'] = len(coinc_events) collected_results['coincident_events'] = coinc_events @@ -416,13 +446,13 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): if (collected_results['n_events_coincident'] == 0) and ('coincident_events' in collected_results.keys()): c = collected_results.pop('coincident_events') - if ('most_likely_direction' in collected_results.keys()): + if ('most_probable_direction' in collected_results.keys()): if (collected_results['n_events_coincident'] == 0): - c = collected_results.pop('most_likely_direction') - if ('most_likely_direction' in collected_results.keys()): + c = collected_results.pop('most_probable_direction') + if ('most_probable_direction' in collected_results.keys()): try: if (collected_results['pval_generic']>0.1): - c = collected_results.pop('most_likely_direction') + c = collected_results.pop('most_probable_direction') except Exception as e: print(e) @@ -438,7 +468,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): logger.info('{}'.format(st)) if status ==0: - with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/gw_token.txt') as f: + with open('/home/jthwaites/private/tokens/gw_token.txt') as f: my_key = f.readline() if not subthreshold: @@ -481,11 +511,12 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): else: with open(os.path.join(save_location, f'mocks/{name}_collected_results.json'),'w') as f: json.dump(collected_results, f, indent = 6) + #logger.info('sending test notice') #status = SendTestAlert(results = collected_results) #logger.info('status: {}'.format(status)) #send the notice to slack (#gw-mock-heartbeat) - with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/gw_token.txt') as f: + with open('/home/jthwaites/private/tokens/gw_token.txt') as f: my_key = f.readline() channel = '#gw-mock-heartbeat' @@ -508,7 +539,7 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): help='Run on live GCNs') parser.add_argument('--test_path', type=str, default=None, help='path to test xml file') -parser.add_argument('--max_wait', type=float, default=60., +parser.add_argument('--max_wait', type=float, default=35., help='Maximum minutes to wait for LLAMA/UML results before timeout (default=60)') parser.add_argument('--wait_for_llama', action='store_true', default=False, help='bool to decide to send llama results with uml, default false while not unblinded') @@ -562,5 +593,5 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False): print(e) exit() - parse_notice(record, wait_for_llama=args.wait_for_llama) + parse_notice(record, wait_for_llama=args.wait_for_llama, heartbeat=args.heartbeat) logger.info("done") \ No newline at end of file diff --git a/fast_response/scripts/run_external_followup.py b/fast_response/scripts/run_external_followup.py index bfcb990..c8ee36d 100755 --- a/fast_response/scripts/run_external_followup.py +++ b/fast_response/scripts/run_external_followup.py @@ -44,6 +44,8 @@ def run_analysis(args): f.calc_pvalue(ntrials = args.ntrials) f.make_dNdE() f.plot_tsd() + + print('Calculating upper limit') f.upper_limit(n_per_sig = args.n_per_sig) results = f.save_results() diff --git a/fast_response/scripts/test_listen_kafka.py b/fast_response/scripts/test_listen_kafka.py index d9f8116..fd4d9b7 100644 --- a/fast_response/scripts/test_listen_kafka.py +++ b/fast_response/scripts/test_listen_kafka.py @@ -7,30 +7,26 @@ import argparse ## none of these bools work yet. just listening to the real one -parser = argparse.ArgumentParser(description='test listener for icecube kafka fra/llama results') -parser.add_argument('--test_domain', type=bool, default=False, - help='bool to use test.gcn.nasa.gov (default False)') -parser.add_argument('--test_topic', type=bool, default=True, - help='listen to gcn.notices.icecube.TEST.lvk_nu_track_search') -args = parser.parse_args() +#parser = argparse.ArgumentParser(description='test listener for icecube kafka fra/llama results') +#parser.add_argument('--test_domain', type=bool, default=False, +# help='bool to use test.gcn.nasa.gov (default False)') +#parser.add_argument('--test_topic', type=bool, default=True, +# help='listen to gcn.notices.icecube.TEST.lvk_nu_track_search') +#args = parser.parse_args() -with open('/cvmfs/icecube.opensciencegrid.org/users/jthwaites/tokens/kafka_token.txt') as f: +with open('/home/jthwaites/private/tokens/kafka_token.txt') as f: client_id = f.readline().rstrip('\n') client_secret = f.readline().rstrip('\n') -#if args.test_domain: -# domain = 'test.gcn.nasa.gov' -#else: +#domain = 'test.gcn.nasa.gov' domain = 'gcn.nasa.gov' consumer = Consumer(client_id=client_id, client_secret=client_secret, domain=domain) -#if args.test_topic: -# topic = 'gcn.notices.icecube.test.lvk_nu_track_search' -#else: +#topic = 'gcn.notices.icecube.test.lvk_nu_track_search' topic = 'gcn.notices.icecube.lvk_nu_track_search' consumer.subscribe([topic]) @@ -41,7 +37,11 @@ while True: for message in consumer.consume(timeout=1): + if message.error(): + print(message.error()) + continue value = message.value() - alert_dict = json.loads(value.decode('utf-8')) + alert_dict = json.loads(value.decode('utf-8')) print(json.dumps(alert_dict, indent=2)) + diff --git a/fast_response/web_utils.py b/fast_response/web_utils.py index e5e6862..1325d76 100644 --- a/fast_response/web_utils.py +++ b/fast_response/web_utils.py @@ -304,9 +304,9 @@ def updateGW_public(analysis, circular = None): start_date = Time(dateutil.parser.parse(analysis['observation_start'])).datetime start_str = f'{start_date.year:02d}_' \ + f'{start_date.month:02d}_{start_date.day:02d}' - analysis['analysisid'] = start_str+'_'+analysis['ref_id'] + analysis['analysisid'] = start_str+'_'+analysis['reference']['gcn.notices.LVK.alert'] - analysis['name'] = analysis['ref_id'] + analysis['name'] = analysis['reference']['gcn.notices.LVK.alert'] if 'test' in analysis['analysisid']: analysis['name'] = analysis['name']+'_test' @@ -352,7 +352,7 @@ def updateGW_public(analysis, circular = None): {} '''.format(row_start,analysis['name'], analysis['trigger_time'][:-1].replace('T',' '), - analysis['ref_id'].split('-')[0], link, duration, + analysis['name'].split('-')[0], link, duration, str(analysis['n_events_coincident']), analysis["neutrino_flux_sensitivity_range"]['flux_sensitivity'][0], analysis["neutrino_flux_sensitivity_range"]['flux_sensitivity'][1], extra_info) @@ -414,10 +414,10 @@ def createGWEventPage(analysis): #if 'LLAMASIG' in new_f[i]: # new_f[i] = new_f[i].replace('LLAMASIG', '{:.2f}'.format(analysis['overall_sig_bayesian'])) if 'MOSTPROBDIR' in new_f[i]: - if 'most_likely_direction' in analysis.keys(): + if 'most_probable_direction' in analysis.keys(): new_f[i] = new_f[i].replace('MOSTPROBDIR', 'RA: {} deg, decl.: {} deg'.format( - analysis['most_likely_direction']['ra'], - analysis['most_likely_direction']['dec'])) + analysis['most_probable_direction']['ra'], + analysis['most_probable_direction']['dec'])) else: new_f[i] = new_f[i].replace('MOSTPROBDIR', 'N/A') if '' in new_f[i]: @@ -443,7 +443,7 @@ def createGWEventPage(analysis): '''.format(e, event['event_dt'], event['localization']['ra'], event['localization']['dec'], - event['localization']['ra_uncertainty'], + event['localization']['ra_uncertainty'][0], event['event_pval_generic'], event['event_pval_bayesian']) e+=1 diff --git a/html/gw_pub_templ_base.html b/html/gw_pub_templ_base.html index a246579..a30c5e4 100644 --- a/html/gw_pub_templ_base.html +++ b/html/gw_pub_templ_base.html @@ -1,4 +1,4 @@ - + @@ -37,11 +37,10 @@

Coincident Events

  • Event p-value (generic transient) = the pvalue for this specific track event with respect to a background only hypothesis in the generic transient search
  • Event p-value (Bayesian) = the pvalue for this specific track event with respect to a background only hypothesis in the Bayesian search
  • - + Note: if this is a Significant LVK event, a skymap zoomed in around the most probable direction will be included below this table.
    - diff --git a/setup.py b/setup.py index 2227739..88a9ff8 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ import setuptools long_message = 'Fast Response Analysis' -version = "1.1.3" +version = "1.1.4" setuptools.setup( name="fast_response",