Skip to content

Commit ab5a11b

Browse files
author
sa-buc
committed
add P2P mode and abort interface
1 parent 774a4af commit ab5a11b

File tree

3 files changed

+288
-2
lines changed

3 files changed

+288
-2
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public static void main(String[] args) {
5353
SimpleTest.run();
5454
ParallelWriteTest.run();
5555
MultiNodeWriteTest.run();
56+
P2PModeWriteTest.run();
57+
AbortBeforeCommitTest.run();
58+
AbortAfterCommitAsyncTest.run();
5659
}
5760

5861
private static void prepareTestTable() throws Exception {
@@ -111,6 +114,12 @@ private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection conne
111114
.setParallel(parallel).setQueryTimeout(timeout).build();
112115
}
113116

117+
private static ObDirectLoadStatement buildP2PModeStatement(ObDirectLoadConnection connection)
118+
throws ObDirectLoadException {
119+
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
120+
.setParallel(parallel).setQueryTimeout(timeout).setP2PMode().build();
121+
}
122+
114123
private static class SimpleTest {
115124

116125
public static void run() {
@@ -309,4 +318,196 @@ public static void run() {
309318

310319
};
311320

321+
private static class P2PModeWriteTest {
322+
323+
private static class P2PModeWriter implements Runnable {
324+
325+
private final byte[] executionIdBytes;
326+
private final int id;
327+
boolean needCommit;
328+
Thread[] threads;
329+
330+
P2PModeWriter(byte[] executionIdBytes, int id) {
331+
this.executionIdBytes = executionIdBytes;
332+
this.id = id;
333+
}
334+
335+
P2PModeWriter(byte[] executionIdBytes, int id, Thread[] threads) {
336+
this.executionIdBytes = executionIdBytes;
337+
this.id = id;
338+
this.needCommit = true;
339+
this.threads = threads;
340+
}
341+
342+
@Override
343+
public void run() {
344+
ObDirectLoadConnection connection = null;
345+
ObDirectLoadStatement statement = null;
346+
try {
347+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
348+
executionId.decode(executionIdBytes);
349+
350+
connection = buildConnection(1);
351+
statement = buildP2PModeStatement(connection);
352+
353+
statement.resume(executionId);
354+
355+
statement.startHeartBeat();
356+
357+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
358+
ObObj[] rowObjs = new ObObj[2];
359+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
360+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
361+
bucket.addRow(rowObjs);
362+
statement.write(bucket);
363+
364+
if (needCommit) {
365+
for (int i = 0; i < threads.length; ++i) {
366+
threads[i].join();
367+
}
368+
statement.commit();
369+
}
370+
371+
} catch (Exception e) {
372+
throw new RuntimeException(e);
373+
} finally {
374+
if (null != statement) {
375+
statement.close();
376+
}
377+
if (null != connection) {
378+
connection.close();
379+
}
380+
}
381+
}
382+
383+
};
384+
385+
public static void run() {
386+
System.out.println("P2PModeWriteTest start");
387+
final int writeThreadNum = 10;
388+
ObDirectLoadConnection connection = null;
389+
ObDirectLoadStatement statement = null;
390+
try {
391+
prepareTestTable();
392+
393+
connection = buildConnection(1);
394+
statement = buildP2PModeStatement(connection);
395+
396+
statement.begin();
397+
398+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
399+
byte[] executionIdBytes = executionId.encode();
400+
401+
Thread[] threads = new Thread[writeThreadNum];
402+
for (int i = 0; i < threads.length; ++i) {
403+
P2PModeWriter NodeWriter = new P2PModeWriter(executionIdBytes, i);
404+
Thread thread = new Thread(NodeWriter);
405+
threads[i] = thread;
406+
}
407+
P2PModeWriter commitNodeWriter = new P2PModeWriter(executionIdBytes, writeThreadNum, threads);
408+
Thread commitThread = new Thread(commitNodeWriter);
409+
commitThread.start();
410+
for (int i = 0; i < threads.length; ++i) {
411+
threads[i].start();
412+
}
413+
commitThread.join();
414+
queryTestTable(writeThreadNum + 1);
415+
416+
} catch (Exception e) {
417+
throw new RuntimeException(e);
418+
} finally {
419+
if (null != statement) {
420+
statement.close();
421+
}
422+
if (null != connection) {
423+
connection.close();
424+
}
425+
}
426+
System.out.println("P2PModeWriteTest successful");
427+
}
428+
429+
};
430+
431+
private static class AbortBeforeCommitTest {
432+
433+
public static void run() {
434+
System.out.println("AbortBeforeCommitTest start");
435+
ObDirectLoadConnection connection = null;
436+
ObDirectLoadStatement statement = null;
437+
try {
438+
prepareTestTable();
439+
System.out.println("prepareTestTable");
440+
441+
connection = buildConnection(1);
442+
statement = buildStatement(connection);
443+
444+
statement.begin();
445+
446+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
447+
ObObj[] rowObjs = new ObObj[2];
448+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
449+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
450+
bucket.addRow(rowObjs);
451+
statement.write(bucket);
452+
453+
statement.abort();
454+
455+
queryTestTable(0);
456+
} catch (Exception e) {
457+
throw new RuntimeException(e);
458+
} finally {
459+
if (null != statement) {
460+
statement.close();
461+
}
462+
if (null != connection) {
463+
connection.close();
464+
}
465+
}
466+
System.out.println("AbortBeforeCommitTest successful");
467+
}
468+
469+
};
470+
471+
private static class AbortAfterCommitAsyncTest {
472+
473+
public static void run() {
474+
System.out.println("AbortAfterCommitAsyncTest start");
475+
ObDirectLoadConnection connection = null;
476+
ObDirectLoadStatement statement = null;
477+
try {
478+
prepareTestTable();
479+
System.out.println("prepareTestTable");
480+
481+
connection = buildConnection(1);
482+
statement = buildStatement(connection);
483+
484+
statement.begin();
485+
486+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
487+
ObObj[] rowObjs = new ObObj[2];
488+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
489+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
490+
bucket.addRow(rowObjs);
491+
statement.write(bucket);
492+
493+
statement.commitAsync();
494+
495+
statement.abort();
496+
497+
queryTestTable(0);
498+
} catch (Exception e) {
499+
throw new RuntimeException(e);
500+
} finally {
501+
if (null != statement) {
502+
statement.close();
503+
}
504+
if (null != connection) {
505+
connection.close();
506+
}
507+
}
508+
System.out.println("AbortAfterCommitAsyncTest successful");
509+
}
510+
511+
};
512+
312513
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
8888
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
8989
obTablePool.init();
9090
executor = new ObDirectLoadStatementExecutor(this);
91+
if (builder.isP2PMode) {
92+
executor.setP2PMode();
93+
}
9194
startQueryTimeMillis = System.currentTimeMillis();
9295
isInited = true;
9396
logger.info("statement init successful, args:" + builder);
@@ -279,6 +282,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
279282
executor.write(bucket);
280283
}
281284

285+
public void startHeartBeat() throws ObDirectLoadException {
286+
executor.startHeartBeatInP2PMode();
287+
}
288+
282289
public void detach() throws ObDirectLoadException {
283290
try {
284291
checkStatus();
@@ -304,6 +311,10 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
304311
executor.resume(executionId);
305312
}
306313

314+
public void abort() throws ObDirectLoadException {
315+
executor.execAbort();
316+
}
317+
307318
public static final class Builder {
308319

309320
private final ObDirectLoadConnection connection;
@@ -320,6 +331,7 @@ public static final class Builder {
320331
private String loadMethod = "full";
321332

322333
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
334+
private boolean isP2PMode = false;
323335

324336
Builder(ObDirectLoadConnection connection) {
325337
this.connection = connection;
@@ -377,6 +389,11 @@ public ObDirectLoadStatement build() throws ObDirectLoadException {
377389
return connection.buildStatement(this);
378390
}
379391

392+
public Builder setP2PMode() {
393+
this.isP2PMode = true;
394+
return this;
395+
}
396+
380397
}
381398

382399
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/execution/ObDirectLoadStatementExecutor.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ObDirectLoadStatementExecutor {
5858
private long taskId = 0;
5959
private ObAddr svrAddr = null;
6060
private ObDirectLoadException cause = null; // 失败原因
61+
private boolean isP2PMode = false;
6162

6263
public ObDirectLoadStatementExecutor(ObDirectLoadStatement statement) {
6364
this.statement = statement;
@@ -97,6 +98,10 @@ public String toString() {
9798
return String.format("{svrAddr:%s, tableId:%d, taskId:%d}", svrAddr, tableId, taskId);
9899
}
99100

101+
public void setP2PMode() {
102+
isP2PMode = true;
103+
}
104+
100105
public synchronized ObDirectLoadStatementFuture begin() {
101106
logger.info("statement call begin");
102107
ObDirectLoadStatementAsyncPromiseTask task = null;
@@ -172,7 +177,11 @@ public synchronized void resume(ObDirectLoadStatementExecutionId executionId)
172177
throws ObDirectLoadException {
173178
logger.info("statement call resume");
174179
try {
175-
compareAndSetState(NONE, LOADING_ONLY, "resume");
180+
if (isP2PMode) {
181+
compareAndSetState(NONE, LOADING, "resume in P2P mode");
182+
} else {
183+
compareAndSetState(NONE, LOADING_ONLY, "resume");
184+
}
176185
} catch (ObDirectLoadException e) {
177186
logger.warn("statement resume failed", e);
178187
throw e;
@@ -232,7 +241,11 @@ public void close() {
232241
}
233242
}
234243
// 退出任务
235-
abortIfNeed();
244+
if (isP2PMode) {
245+
abortIfNeedWhenPeerClosed();
246+
} else {
247+
abortIfNeed();
248+
}
236249
ObDirectLoadStatementFuture abortFuture = null;
237250
synchronized (this) {
238251
if (this.abortFuture != null && !this.abortFuture.isDone()) {
@@ -296,6 +309,53 @@ private synchronized void abortIfNeed() {
296309
}
297310
}
298311

312+
private synchronized void abortIfNeedWhenPeerClosed() {
313+
logger.debug("statement abort if need when peer closed");
314+
if (abortFuture != null) {
315+
logger.debug("statement in abort");
316+
return;
317+
}
318+
final int state = stateFlag.get();
319+
boolean needAbort = false;
320+
boolean unexpectedState = false;
321+
String reason = "";
322+
if (state == NONE) {
323+
reason = "not begin";
324+
} else if (state == BEGINNING) {
325+
unexpectedState = true;
326+
reason = "begin not finish";
327+
} else if (state == LOADING) {
328+
needAbort = false;
329+
} else if (state == LOADING_ONLY) {
330+
needAbort = false;
331+
} else if (state == COMMITTING) {
332+
unexpectedState = true;
333+
reason = "commit not finish";
334+
} else if (state == COMMIT) {
335+
reason = "already commit";
336+
} else if (state == FAIL) {
337+
if (svrAddr != null) {
338+
needAbort = true;
339+
} else {
340+
reason = "begin fail";
341+
}
342+
} else if (state == ABORT) {
343+
reason = "already abort";
344+
}
345+
if (!needAbort) {
346+
if (unexpectedState) {
347+
logger.warn("statement cannot abort because " + reason);
348+
} else {
349+
logger.debug("statement no need abort because " + reason);
350+
setState(ABORT);
351+
}
352+
} else if (isDetached) {
353+
logger.debug("statement no need abort because is detached");
354+
} else {
355+
abort();
356+
}
357+
}
358+
299359
private ObDirectLoadStatementFuture abort() {
300360
logger.info("statement call abort");
301361
setState(ABORT);
@@ -323,6 +383,10 @@ void startHeartBeat() throws ObDirectLoadException {
323383
}
324384
}
325385

386+
public void startHeartBeatInP2PMode() throws ObDirectLoadException {
387+
startHeartBeat();
388+
}
389+
326390
void stopHeartBeat() {
327391
logger.info("statement stop heart beat");
328392
try {
@@ -355,6 +419,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
355419
}
356420
}
357421

422+
public void execAbort() throws ObDirectLoadException {
423+
abort();
424+
}
425+
358426
private String getUnexpectedStateReason(int state) {
359427
String reason = "";
360428
if (state == NONE) {

0 commit comments

Comments
 (0)