-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun-pipeline
executable file
·117 lines (112 loc) · 4.95 KB
/
run-pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
import os
import sys
import time
import logging
import argparse
from connected_component_pipeline import run_analysis, run_analysis_strip
from multiprocessing import Pool
from datetime import datetime
from terra_common import CoordinateConverter as CC
import tempfile
import pickle
def parse_args():
# TODO add arg for globus refresh token
description = 'usage '
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--raw-path', help='path to the root of raw data', required=True)
parser.add_argument('--ply-path', help='path to the root of ply data', required=True)
parser.add_argument('-o', '--output-path', help='yyyy-mm-dd', required=True)
parser.add_argument('--start', help='Start date. Format: yyyy-mm-dd', required=True)
parser.add_argument('--end', help='End date. Format: yyyy-mm-dd', required=True)
parser.add_argument('--no-download', help='no download ply files from globus', action='store_true')
parser.add_argument('--scanner', help='from which scanner', choices=['east', 'west', 'both'], default='east')
parser.add_argument('--crop', help='by plot or by leaf', action='store_true')
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-p', '--processes', help='number of sub-processes', default=8, type=int)
parser.add_argument('--coord-converter', help='path to the pickled cc', default=None, type=str)
args = parser.parse_args()
return args
args = parse_args()
logger = logging.getLogger('season_4_leaf_len_ppln_main')
logger.setLevel(logging.INFO)
if args.verbose:
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s %(levelname)s: \t%(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
if args.verbose:
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info('\n\t raw data path: {}'
'\n\t ply data path: {} '
'\n\t output path: {} '
'\n\t start date: {}'
'\n\t end date: {}'
'\n\t scanner: {}'
'\n\t crop: {}'
'\n\t download from globus: {}'
'\n\t number of processes: {}'
.format(args.raw_path, args.ply_path, args.output_path, args.start, args.end, args.scanner, args.crop,
not args.no_download, args.processes))
if not os.path.isdir(args.output_path):
os.mkdir(args.output_path)
start_date = datetime.strptime(args.start, '%Y-%m-%d')
end_date = datetime.strptime(args.end, '%Y-%m-%d')
if args.coord_converter is None:
os.environ['BETYDB_KEY'] = '9999999999999999999999999999999999999999'
cc_tmp_file = tempfile.NamedTemporaryFile(delete=False)
cc = CC(useSubplot=True)
logger.info('bety query start.')
cc.bety_query(start_date.strftime('%Y-%m-%d'), useSubplot=True)
logger.info('bety query complete.')
pickle.dump(cc, cc_tmp_file)
logger.info('coord converter dumped to {}'.format(cc_tmp_file.name))
cc_tmp_file.close()
cc_file_name = cc_tmp_file.name
else:
cc_file_name = args.coord_converter
with open(cc_file_name, 'rb') as f:
cc = pickle.load(f)
# TODO check folder existence
logger.info('scanning folders')
if args.verbose:
task_log_lv = logging.DEBUG
else:
task_log_lv = logging.INFO
task_list = []
for date_folder in os.listdir(args.raw_path):
folder_date = datetime.strptime(date_folder, '%Y-%m-%d')
if folder_date < start_date or folder_date > end_date:
continue
raw_date_folder_path = os.path.join(args.raw_path, date_folder)
ply_date_folder_path = os.path.join(args.ply_path, date_folder)
for sub_folder in os.listdir(raw_date_folder_path):
# task format: raw_data_folder, ply_data_folder, output_folder,
# sensor_name='east', download_ply=False, per_plot=True, log_lv=logging.DEBUG):
task_raw_path = os.path.join(raw_date_folder_path, sub_folder, '')
task_ply_path = os.path.join(ply_date_folder_path, sub_folder, '')
download_ply = not args.no_download
if args.scanner == 'both':
task_east = (task_raw_path, task_ply_path, args.output_path, 'east', download_ply, args.crop, task_log_lv, cc)
task_west = (task_raw_path, task_ply_path, args.output_path, 'west', download_ply, args.crop, task_log_lv, cc)
task_list.append(task_east)
task_list.append(task_west)
else:
task = (task_raw_path, task_ply_path, args.output_path, args.scanner, download_ply, args.crop, task_log_lv, cc)
task_list.append(task)
total_tasks = len(task_list)
logger.info('total task: {}'.format(total_tasks))
pool = Pool(processes=args.processes)
count = 0
if args.crop:
rs = pool.starmap_async(run_analysis, task_list)
else:
rs = pool.starmap_async(run_analysis_strip, task_list)
while (True):
if rs.ready():
break
remaining = rs._number_left
logger.info('remain/total: {}/{}'.format(remaining, total_tasks))
time.sleep(60 * 5)