Skip to content

Commit e1d7a8e

Browse files
author
sa-buc
committed
add P2P mode and abort interface
1 parent 0e7f901 commit e1d7a8e

File tree

3 files changed

+291
-11
lines changed

3 files changed

+291
-11
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 231 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public static void main(String[] args) {
5353
SimpleTest.run();
5454
ParallelWriteTest.run();
5555
MultiNodeWriteTest.run();
56+
P2PModeWriteTest.run();
57+
SimpleAbortTest.run();
58+
P2PModeAbortTest.run();
5659
}
5760

5861
private static void prepareTestTable() throws Exception {
@@ -105,16 +108,16 @@ private static ObDirectLoadConnection buildConnection(int writeThreadNum)
105108
.enableParallelWrite(writeThreadNum).build();
106109
}
107110

108-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection)
111+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, boolean isP2PMode)
109112
throws ObDirectLoadException {
110113
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
111-
.setParallel(parallel).setQueryTimeout(timeout).build();
114+
.setParallel(parallel).setQueryTimeout(timeout).setIsP2PMode(isP2PMode).build();
112115
}
113116

114-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId)
117+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId, boolean isP2PMode)
115118
throws ObDirectLoadException {
116119
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
117-
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).build();
120+
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).setIsP2PMode(isP2PMode).build();
118121
}
119122

120123
private static class SimpleTest {
@@ -127,7 +130,7 @@ public static void run() {
127130
prepareTestTable();
128131

129132
connection = buildConnection(1);
130-
statement = buildStatement(connection);
133+
statement = buildStatement(connection, false);
131134

132135
statement.begin();
133136

@@ -192,7 +195,7 @@ public static void run() {
192195
prepareTestTable();
193196

194197
connection = buildConnection(parallel);
195-
statement = buildStatement(connection);
198+
statement = buildStatement(connection, false);
196199

197200
statement.begin();
198201

@@ -246,7 +249,7 @@ public void run() {
246249
executionId.decode(executionIdBytes);
247250

248251
connection = buildConnection(1);
249-
statement = buildStatement(connection, executionId);
252+
statement = buildStatement(connection, executionId, false);
250253

251254
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
252255
ObObj[] rowObjs = new ObObj[2];
@@ -277,7 +280,7 @@ public static void run() {
277280
prepareTestTable();
278281

279282
connection = buildConnection(1);
280-
statement = buildStatement(connection);
283+
statement = buildStatement(connection, false);
281284

282285
statement.begin();
283286

@@ -313,4 +316,224 @@ public static void run() {
313316

314317
};
315318

319+
private static class P2PModeWriteTest {
320+
321+
private static class P2PNodeWriter implements Runnable {
322+
323+
private final byte[] executionIdBytes;
324+
private final int id;
325+
326+
P2PNodeWriter(byte[] executionIdBytes, int id) {
327+
this.executionIdBytes = executionIdBytes;
328+
this.id = id;
329+
}
330+
331+
@Override
332+
public void run() {
333+
ObDirectLoadConnection connection = null;
334+
ObDirectLoadStatement statement = null;
335+
try {
336+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
337+
executionId.decode(executionIdBytes);
338+
339+
connection = buildConnection(1);
340+
statement = buildStatement(connection, executionId, true);
341+
342+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
343+
ObObj[] rowObjs = new ObObj[2];
344+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
345+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
346+
bucket.addRow(rowObjs);
347+
statement.write(bucket);
348+
349+
} catch (Exception e) {
350+
throw new RuntimeException(e);
351+
} finally {
352+
if (null != statement) {
353+
statement.close();
354+
}
355+
if (null != connection) {
356+
connection.close();
357+
}
358+
}
359+
}
360+
361+
};
362+
363+
public static void run() {
364+
System.out.println("P2PModeWriteTest start");
365+
final int writeThreadNum = 10;
366+
ObDirectLoadConnection connection = null;
367+
ObDirectLoadStatement statement = null;
368+
try {
369+
prepareTestTable();
370+
371+
connection = buildConnection(1);
372+
statement = buildStatement(connection, true);
373+
374+
statement.begin();
375+
376+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
377+
byte[] executionIdBytes = executionId.encode();
378+
379+
Thread[] threads = new Thread[writeThreadNum];
380+
for (int i = 0; i < threads.length; ++i) {
381+
P2PNodeWriter NodeWriter = new P2PNodeWriter(executionIdBytes, i);
382+
Thread thread = new Thread(NodeWriter);
383+
thread.start();
384+
threads[i] = thread;
385+
}
386+
for (int i = 0; i < threads.length; ++i) {
387+
threads[i].join();
388+
}
389+
390+
statement.commit();
391+
392+
queryTestTable(writeThreadNum);
393+
} catch (Exception e) {
394+
throw new RuntimeException(e);
395+
} finally {
396+
if (null != statement) {
397+
statement.close();
398+
}
399+
if (null != connection) {
400+
connection.close();
401+
}
402+
}
403+
System.out.println("P2PModeWriteTest successful");
404+
}
405+
406+
};
407+
408+
private static class SimpleAbortTest {
409+
410+
public static void run() {
411+
System.out.println("SimpleAbortTest start");
412+
ObDirectLoadConnection connection = null;
413+
ObDirectLoadStatement statement = null;
414+
try {
415+
prepareTestTable();
416+
System.out.println("prepareTestTable");
417+
418+
connection = buildConnection(1);
419+
statement = buildStatement(connection, false);
420+
421+
statement.begin();
422+
423+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
424+
ObObj[] rowObjs = new ObObj[2];
425+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
426+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
427+
bucket.addRow(rowObjs);
428+
statement.write(bucket);
429+
430+
statement.abort();
431+
432+
queryTestTable(0);
433+
} catch (Exception e) {
434+
throw new RuntimeException(e);
435+
} finally {
436+
if (null != statement) {
437+
statement.close();
438+
}
439+
if (null != connection) {
440+
connection.close();
441+
}
442+
}
443+
System.out.println("SimpleAbortTest successful");
444+
}
445+
446+
};
447+
448+
private static class P2PModeAbortTest {
449+
450+
451+
private static class AbortP2PNode implements Runnable {
452+
453+
private final byte[] executionIdBytes;
454+
private final int id;
455+
456+
AbortP2PNode(byte[] executionIdBytes, int id) {
457+
this.executionIdBytes = executionIdBytes;
458+
this.id = id;
459+
}
460+
461+
@Override
462+
public void run() {
463+
ObDirectLoadConnection connection = null;
464+
ObDirectLoadStatement statement = null;
465+
try {
466+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
467+
executionId.decode(executionIdBytes);
468+
469+
connection = buildConnection(1);
470+
statement = buildStatement(connection, executionId, true);
471+
472+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
473+
ObObj[] rowObjs = new ObObj[2];
474+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
475+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
476+
bucket.addRow(rowObjs);
477+
statement.write(bucket);
478+
479+
statement.abort();
480+
481+
} catch (Exception e) {
482+
throw new RuntimeException(e);
483+
} finally {
484+
if (null != statement) {
485+
statement.close();
486+
}
487+
if (null != connection) {
488+
connection.close();
489+
}
490+
}
491+
}
492+
493+
};
494+
495+
public static void run() {
496+
System.out.println("P2PModeAbortTest start");
497+
ObDirectLoadConnection connection = null;
498+
ObDirectLoadStatement statement = null;
499+
try {
500+
prepareTestTable();
501+
502+
connection = buildConnection(1);
503+
statement = buildStatement(connection, true);
504+
505+
statement.begin();
506+
507+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
508+
ObObj[] rowObjs = new ObObj[2];
509+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
510+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
511+
bucket.addRow(rowObjs);
512+
statement.write(bucket);
513+
514+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
515+
byte[] executionIdBytes = executionId.encode();
516+
517+
AbortP2PNode abortP2PNode = new AbortP2PNode(executionIdBytes, 3);
518+
Thread abortNodeThread = new Thread(abortP2PNode);
519+
abortNodeThread.start();
520+
abortNodeThread.join();
521+
522+
queryTestTable(0);
523+
524+
} catch (Exception e) {
525+
throw new RuntimeException(e);
526+
} finally {
527+
if (null != statement) {
528+
statement.close();
529+
}
530+
if (null != connection) {
531+
connection.close();
532+
}
533+
}
534+
System.out.println("P2PModeAbortTest successful");
535+
}
536+
537+
};
538+
316539
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
8787
connection.getProtocol().checkIsSupported(this);
8888
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
8989
obTablePool.init();
90-
executor = new ObDirectLoadStatementExecutor(this);
90+
executor = new ObDirectLoadStatementExecutor(this, builder.isP2PMode);
9191
if (builder.executionId != null) {
9292
executor.resume(builder.executionId);
9393
}
@@ -308,6 +308,10 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
308308
executor.resume(executionId);
309309
}
310310

311+
public void abort() throws ObDirectLoadException {
312+
executor.requestAbort();
313+
}
314+
311315
public static final class Builder {
312316

313317
private final ObDirectLoadConnection connection;
@@ -327,6 +331,7 @@ public static final class Builder {
327331
private ObDirectLoadStatementExecutionId executionId = null;
328332

329333
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
334+
private boolean isP2PMode = false;
330335

331336
Builder(ObDirectLoadConnection connection) {
332337
this.connection = connection;
@@ -382,6 +387,11 @@ public ObDirectLoadTraceId getTraceId() {
382387
return traceId;
383388
}
384389

390+
public Builder setIsP2PMode(boolean isP2PMode) {
391+
this.isP2PMode = isP2PMode;
392+
return this;
393+
}
394+
385395
public String toString() {
386396
return String
387397
.format(

0 commit comments

Comments
 (0)