Skip to content

Commit 89a44c0

Browse files
author
xiaofeng_metis
committed
add kill task cli
1 parent 9ec0319 commit 89a44c0

File tree

3 files changed

+418
-13
lines changed

3 files changed

+418
-13
lines changed
Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.realtime;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
24+
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.classification.InterfaceStability;
26+
import org.apache.hadoop.conf.Configured;
27+
import org.apache.hadoop.ipc.RemoteException;
28+
import org.apache.hadoop.realtime.conf.DragonConfiguration;
29+
import org.apache.hadoop.realtime.job.JobPriority;
30+
import org.apache.hadoop.realtime.records.JobId;
31+
import org.apache.hadoop.realtime.records.JobReport;
32+
import org.apache.hadoop.realtime.records.TaskAttemptId;
33+
import org.apache.hadoop.security.AccessControlException;
34+
import org.apache.hadoop.util.Tool;
35+
import org.apache.hadoop.util.ToolRunner;
36+
import org.apache.hadoop.yarn.api.records.ApplicationId;
37+
import org.apache.hadoop.yarn.util.Records;
38+
39+
@InterfaceAudience.Public
40+
@InterfaceStability.Evolving
41+
/**
42+
* Dragon command line interface that interprets the dragon job options
43+
*/
44+
public final class DragonCLI extends Configured implements Tool {
45+
private static final Log LOG = LogFactory.getLog(DragonCLI.class);
46+
47+
private String getJobPriorityNames() {
48+
StringBuffer sb = new StringBuffer();
49+
for (JobPriority p : JobPriority.values()) {
50+
sb.append(p.name()).append(" ");
51+
}
52+
return sb.substring(0, sb.length()-1);
53+
}
54+
55+
/**
56+
* Display usage of the command-line tool and terminate execution.
57+
*/
58+
private void displayUsage(String cmd) {
59+
String prefix = "Usage: CLI ";
60+
String jobPriorityValues = getJobPriorityNames();
61+
String taskStates = "running, completed";
62+
if ("-submit".equals(cmd)) {
63+
System.err.println(prefix + "[" + cmd + " <job-file>]");
64+
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
65+
System.err.println(prefix + "[" + cmd + " <job-id>]");
66+
} else if ("-counter".equals(cmd)) {
67+
System.err.println(prefix + "[" + cmd +
68+
" <job-id> <group-name> <counter-name>]");
69+
} else if ("-events".equals(cmd)) {
70+
System.err.println(prefix + "[" + cmd +
71+
" <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
72+
} else if ("-history".equals(cmd)) {
73+
System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
74+
} else if ("-list".equals(cmd)) {
75+
System.err.println(prefix + "[" + cmd + " [all]]");
76+
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
77+
System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
78+
} else if ("-set-priority".equals(cmd)) {
79+
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
80+
"Valid values for priorities are: "
81+
+ jobPriorityValues);
82+
} else if ("-list-attempt-ids".equals(cmd)) {
83+
System.err.println(prefix + "[" + cmd +
84+
" <job-id> <task-type> <task-state>]. " +
85+
"Valid values for <task-state> are " + taskStates);
86+
} else if ("-logs".equals(cmd)) {
87+
System.err.println(prefix + "[" + cmd +
88+
" <job-id> <task-attempt-id>]. " +
89+
" <task-attempt-id> is optional to get task attempt logs.");
90+
} else {
91+
System.err.printf(prefix + "<command> <args>\n");
92+
System.err.printf("\t[-submit <job-file>]\n");
93+
System.err.printf("\t[-status <job-id>]\n");
94+
System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
95+
System.err.printf("\t[-kill <job-id>]\n");
96+
System.err.printf("\t[-set-priority <job-id> <priority>]. " +
97+
"Valid values for priorities are: " + jobPriorityValues + "\n");
98+
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
99+
System.err.printf("\t[-history <jobHistoryFile>]\n");
100+
System.err.printf("\t[-list [all]]\n");
101+
System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
102+
"<task-state>]. " +
103+
"Valid values for <task-state> are " + taskStates);
104+
System.err.printf("\t[-kill-task <task-attempt-id>]\n");
105+
System.err.printf("\t[-fail-task <task-attempt-id>]\n");
106+
System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
107+
ToolRunner.printGenericCommandUsage(System.out);
108+
}
109+
}
110+
111+
/** Construct a JobId object from given string */
112+
private JobId jobIdForName(String name) throws IllegalArgumentException {
113+
if (name == null)
114+
return null;
115+
try {
116+
String[] parts = name.split("_");
117+
if (parts.length == 3) {
118+
if (parts[0].equals("job")) {
119+
long clusterTimeStamp = Long.parseLong(parts[1]);
120+
int id = Integer.parseInt(parts[2]);
121+
JobId jobId = Records.newRecord(JobId.class);
122+
jobId.setId(id);
123+
ApplicationId appId = Records.newRecord(ApplicationId.class);
124+
appId.setId(id);
125+
appId.setClusterTimestamp(clusterTimeStamp);
126+
jobId.setAppId(appId);
127+
return jobId;
128+
}
129+
}
130+
} catch (Exception ex) {// fall below
131+
}
132+
throw new IllegalArgumentException("JobId string : " + name
133+
+ " is not properly formed");
134+
}
135+
136+
@Override
137+
public int run(String[] args) throws Exception {
138+
int exitCode = -1;
139+
if (args.length < 1) {
140+
displayUsage("");
141+
return exitCode;
142+
}
143+
// process arguments
144+
String cmd = args[0];
145+
String submitJobFile = null;
146+
String jobid = null;
147+
String taskid = null;
148+
String historyFile = null;
149+
String counterGroupName = null;
150+
String counterName = null;
151+
JobPriority jp = null;
152+
String taskState = null;
153+
int fromEvent = 0;
154+
int nEvents = 0;
155+
boolean getStatus = false;
156+
boolean getCounter = false;
157+
boolean killJob = false;
158+
boolean listEvents = false;
159+
boolean viewHistory = false;
160+
boolean viewAllHistory = false;
161+
boolean listJobs = false;
162+
boolean listAllJobs = false;
163+
boolean displayTasks = false;
164+
boolean killTask = false;
165+
boolean failTask = false;
166+
boolean setJobPriority = false;
167+
boolean logs = false;
168+
169+
if ("-submit".equals(cmd)) {
170+
if (args.length != 2) {
171+
displayUsage(cmd);
172+
return exitCode;
173+
}
174+
submitJobFile = args[1];
175+
} else if ("-status".equals(cmd)) {
176+
if (args.length != 2) {
177+
displayUsage(cmd);
178+
return exitCode;
179+
}
180+
jobid = args[1];
181+
getStatus = true;
182+
} else if("-counter".equals(cmd)) {
183+
if (args.length != 4) {
184+
displayUsage(cmd);
185+
return exitCode;
186+
}
187+
getCounter = true;
188+
jobid = args[1];
189+
counterGroupName = args[2];
190+
counterName = args[3];
191+
} else if ("-kill".equals(cmd)) {
192+
if (args.length != 2) {
193+
displayUsage(cmd);
194+
return exitCode;
195+
}
196+
jobid = args[1];
197+
killJob = true;
198+
} else if ("-set-priority".equals(cmd)) {
199+
if (args.length != 3) {
200+
displayUsage(cmd);
201+
return exitCode;
202+
}
203+
jobid = args[1];
204+
try {
205+
jp = JobPriority.valueOf(args[2]);
206+
} catch (IllegalArgumentException iae) {
207+
LOG.info(iae);
208+
displayUsage(cmd);
209+
return exitCode;
210+
}
211+
setJobPriority = true;
212+
} else if ("-events".equals(cmd)) {
213+
if (args.length != 4) {
214+
displayUsage(cmd);
215+
return exitCode;
216+
}
217+
jobid = args[1];
218+
fromEvent = Integer.parseInt(args[2]);
219+
nEvents = Integer.parseInt(args[3]);
220+
listEvents = true;
221+
} else if ("-history".equals(cmd)) {
222+
if (args.length != 2 && !(args.length == 3 && "all".equals(args[1]))) {
223+
displayUsage(cmd);
224+
return exitCode;
225+
}
226+
viewHistory = true;
227+
if (args.length == 3 && "all".equals(args[1])) {
228+
viewAllHistory = true;
229+
historyFile = args[2];
230+
} else {
231+
historyFile = args[1];
232+
}
233+
} else if ("-list".equals(cmd)) {
234+
if (args.length != 1 && !(args.length == 2 && "all".equals(args[1]))) {
235+
displayUsage(cmd);
236+
return exitCode;
237+
}
238+
if (args.length == 2 && "all".equals(args[1])) {
239+
listAllJobs = true;
240+
} else {
241+
listJobs = true;
242+
}
243+
} else if("-kill-task".equals(cmd)) {
244+
if (args.length != 2) {
245+
displayUsage(cmd);
246+
return exitCode;
247+
}
248+
killTask = true;
249+
taskid = args[1];
250+
} else if("-fail-task".equals(cmd)) {
251+
if (args.length != 2) {
252+
displayUsage(cmd);
253+
return exitCode;
254+
}
255+
failTask = true;
256+
taskid = args[1];
257+
} else if ("-list-attempt-ids".equals(cmd)) {
258+
if (args.length != 3) {
259+
displayUsage(cmd);
260+
return exitCode;
261+
}
262+
jobid = args[1];
263+
taskState = args[2];
264+
displayTasks = true;
265+
} else if ("-logs".equals(cmd)) {
266+
if (args.length == 2 || args.length ==3) {
267+
logs = true;
268+
jobid = args[1];
269+
if (args.length == 3) {
270+
taskid = args[2];
271+
} else {
272+
taskid = null;
273+
}
274+
} else {
275+
displayUsage(cmd);
276+
return exitCode;
277+
}
278+
} else {
279+
displayUsage(cmd);
280+
return exitCode;
281+
}
282+
283+
// initialize cluster
284+
DragonJobServiceFactory factory = new DragonJobServiceFactory();
285+
DragonJobService service = factory.create(getConf());
286+
287+
// Submit the request
288+
try {
289+
if (submitJobFile != null) {
290+
DragonJob job =
291+
DragonJob.getInstance(new DragonConfiguration(submitJobFile));
292+
job.submit();
293+
System.out.println("Created job " + job.getID());
294+
exitCode = 0;
295+
} else if (getStatus) {
296+
JobId jobId = jobIdForName(jobid);
297+
JobReport report = service.getJobReport(jobId);
298+
if (report == null) {
299+
System.out.println("Could not find job " + jobid);
300+
} else {
301+
exitCode = 0;
302+
}
303+
} else if (getCounter) {
304+
305+
} else if (killJob) {
306+
JobId jobId = jobIdForName(jobid);
307+
JobReport report = service.getJobReport(jobId);
308+
if (report == null) {
309+
System.out.println("Could not find job " + jobid);
310+
} else {
311+
service.killJob(jobId);
312+
System.out.println("Killed job " + jobid);
313+
exitCode = 0;
314+
}
315+
} else if (setJobPriority) {
316+
JobId jobId = jobIdForName(jobid);
317+
JobReport report = service.getJobReport(jobId);
318+
if (report == null) {
319+
System.out.println("Could not find job " + jobid);
320+
} else {
321+
System.out.println("Changed job priority.");
322+
exitCode = 0;
323+
}
324+
} else if (viewHistory) {
325+
exitCode = 0;
326+
} else if (listEvents) {
327+
exitCode = 0;
328+
} else if (listJobs) {
329+
exitCode = 0;
330+
} else if (listAllJobs) {
331+
exitCode = 0;
332+
} else if (displayTasks) {
333+
334+
} else if(killTask) {
335+
final TaskAttemptId taskId = TaskAttemptId.forName(taskid);
336+
service.killTask(taskId, false);
337+
exitCode = 0;
338+
} else if(failTask) {
339+
final TaskAttemptId taskId = TaskAttemptId.forName(taskid);
340+
service.killTask(taskId, true);
341+
exitCode = 0;
342+
} else if (logs) {
343+
}
344+
} catch (RemoteException re) {
345+
IOException unwrappedException = re.unwrapRemoteException();
346+
if (unwrappedException instanceof AccessControlException) {
347+
System.out.println(unwrappedException.getMessage());
348+
} else {
349+
throw re;
350+
}
351+
} finally {
352+
}
353+
return exitCode;
354+
}
355+
356+
/**
357+
* @param args
358+
*/
359+
public static void main(String[] args) throws Exception {
360+
int res = ToolRunner.run(new DragonCLI(), args);
361+
System.exit(res);
362+
}
363+
}

0 commit comments

Comments
 (0)