Skip to content

Commit 183e8bb

Browse files
author
sa-buc
committed
add P2P mode and abort interface
1 parent 0e7f901 commit 183e8bb

File tree

3 files changed

+283
-12
lines changed

3 files changed

+283
-12
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 249 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
2424
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
2525
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutionId;
26+
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutor.NodeRole;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
@@ -53,6 +54,9 @@ public static void main(String[] args) {
5354
SimpleTest.run();
5455
ParallelWriteTest.run();
5556
MultiNodeWriteTest.run();
57+
P2PModeWriteTest.run();
58+
SimpleAbortTest.run();
59+
P2PModeAbortTest.run();
5660
}
5761

5862
private static void prepareTestTable() throws Exception {
@@ -105,16 +109,16 @@ private static ObDirectLoadConnection buildConnection(int writeThreadNum)
105109
.enableParallelWrite(writeThreadNum).build();
106110
}
107111

108-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection)
112+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, NodeRole nodeRole)
109113
throws ObDirectLoadException {
110114
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
111-
.setParallel(parallel).setQueryTimeout(timeout).build();
115+
.setParallel(parallel).setQueryTimeout(timeout).setNodeRole(nodeRole).build();
112116
}
113117

114-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId)
118+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId, NodeRole nodeRole)
115119
throws ObDirectLoadException {
116120
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
117-
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).build();
121+
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).setNodeRole(nodeRole).build();
118122
}
119123

120124
private static class SimpleTest {
@@ -127,7 +131,7 @@ public static void run() {
127131
prepareTestTable();
128132

129133
connection = buildConnection(1);
130-
statement = buildStatement(connection);
134+
statement = buildStatement(connection, NodeRole.PRIMARY);
131135

132136
statement.begin();
133137

@@ -192,7 +196,7 @@ public static void run() {
192196
prepareTestTable();
193197

194198
connection = buildConnection(parallel);
195-
statement = buildStatement(connection);
199+
statement = buildStatement(connection, NodeRole.PRIMARY);
196200

197201
statement.begin();
198202

@@ -246,7 +250,7 @@ public void run() {
246250
executionId.decode(executionIdBytes);
247251

248252
connection = buildConnection(1);
249-
statement = buildStatement(connection, executionId);
253+
statement = buildStatement(connection, executionId, NodeRole.WRITE_ONLY);
250254

251255
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
252256
ObObj[] rowObjs = new ObObj[2];
@@ -277,7 +281,7 @@ public static void run() {
277281
prepareTestTable();
278282

279283
connection = buildConnection(1);
280-
statement = buildStatement(connection);
284+
statement = buildStatement(connection, NodeRole.PRIMARY);
281285

282286
statement.begin();
283287

@@ -313,4 +317,241 @@ public static void run() {
313317

314318
};
315319

320+
private static class P2PModeWriteTest {
321+
322+
private static class P2PNodeWriter implements Runnable {
323+
324+
private final byte[] executionIdBytes;
325+
private final int id;
326+
boolean needCommit;
327+
Thread[] threads;
328+
329+
P2PNodeWriter(byte[] executionIdBytes, int id) {
330+
this.executionIdBytes = executionIdBytes;
331+
this.id = id;
332+
}
333+
334+
P2PNodeWriter(byte[] executionIdBytes, int id, Thread[] threads) {
335+
this.executionIdBytes = executionIdBytes;
336+
this.id = id;
337+
this.needCommit = true;
338+
this.threads = threads;
339+
}
340+
341+
@Override
342+
public void run() {
343+
ObDirectLoadConnection connection = null;
344+
ObDirectLoadStatement statement = null;
345+
try {
346+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
347+
executionId.decode(executionIdBytes);
348+
349+
connection = buildConnection(1);
350+
statement = buildStatement(connection, executionId, NodeRole.P2P);
351+
352+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
353+
ObObj[] rowObjs = new ObObj[2];
354+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
355+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
356+
bucket.addRow(rowObjs);
357+
statement.write(bucket);
358+
359+
if (needCommit) {
360+
for (int i = 0; i < threads.length; ++i) {
361+
threads[i].join();
362+
}
363+
statement.commit();
364+
}
365+
366+
} catch (Exception e) {
367+
throw new RuntimeException(e);
368+
} finally {
369+
if (null != statement) {
370+
statement.close();
371+
}
372+
if (null != connection) {
373+
connection.close();
374+
}
375+
}
376+
}
377+
378+
};
379+
380+
public static void run() {
381+
System.out.println("P2PModeWriteTest start");
382+
final int writeThreadNum = 10;
383+
ObDirectLoadConnection connection = null;
384+
ObDirectLoadStatement statement = null;
385+
try {
386+
prepareTestTable();
387+
388+
connection = buildConnection(1);
389+
statement = buildStatement(connection, NodeRole.P2P);
390+
391+
statement.begin();
392+
393+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
394+
byte[] executionIdBytes = executionId.encode();
395+
396+
Thread[] threads = new Thread[writeThreadNum];
397+
for (int i = 0; i < threads.length; ++i) {
398+
P2PNodeWriter NodeWriter = new P2PNodeWriter(executionIdBytes, i);
399+
Thread thread = new Thread(NodeWriter);
400+
threads[i] = thread;
401+
}
402+
P2PNodeWriter commitNodeWriter = new P2PNodeWriter(executionIdBytes, writeThreadNum, threads);
403+
Thread commitThread = new Thread(commitNodeWriter);
404+
commitThread.start();
405+
for (int i = 0; i < threads.length; ++i) {
406+
threads[i].start();
407+
}
408+
commitThread.join();
409+
queryTestTable(writeThreadNum + 1);
410+
411+
} catch (Exception e) {
412+
throw new RuntimeException(e);
413+
} finally {
414+
if (null != statement) {
415+
statement.close();
416+
}
417+
if (null != connection) {
418+
connection.close();
419+
}
420+
}
421+
System.out.println("P2PModeWriteTest successful");
422+
}
423+
424+
};
425+
426+
private static class SimpleAbortTest {
427+
428+
public static void run() {
429+
System.out.println("SimpleAbortTest start");
430+
ObDirectLoadConnection connection = null;
431+
ObDirectLoadStatement statement = null;
432+
try {
433+
prepareTestTable();
434+
System.out.println("prepareTestTable");
435+
436+
connection = buildConnection(1);
437+
statement = buildStatement(connection, NodeRole.PRIMARY);
438+
439+
statement.begin();
440+
441+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
442+
ObObj[] rowObjs = new ObObj[2];
443+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
444+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
445+
bucket.addRow(rowObjs);
446+
statement.write(bucket);
447+
448+
statement.abort();
449+
450+
queryTestTable(0);
451+
} catch (Exception e) {
452+
throw new RuntimeException(e);
453+
} finally {
454+
if (null != statement) {
455+
statement.close();
456+
}
457+
if (null != connection) {
458+
connection.close();
459+
}
460+
}
461+
System.out.println("SimpleAbortTest successful");
462+
}
463+
464+
};
465+
466+
private static class P2PModeAbortTest {
467+
468+
469+
private static class AbortP2PNode implements Runnable {
470+
471+
private final byte[] executionIdBytes;
472+
private final int id;
473+
474+
AbortP2PNode(byte[] executionIdBytes, int id) {
475+
this.executionIdBytes = executionIdBytes;
476+
this.id = id;
477+
}
478+
479+
@Override
480+
public void run() {
481+
ObDirectLoadConnection connection = null;
482+
ObDirectLoadStatement statement = null;
483+
try {
484+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
485+
executionId.decode(executionIdBytes);
486+
487+
connection = buildConnection(1);
488+
statement = buildStatement(connection, executionId, NodeRole.P2P);
489+
490+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
491+
ObObj[] rowObjs = new ObObj[2];
492+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
493+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
494+
bucket.addRow(rowObjs);
495+
statement.write(bucket);
496+
497+
statement.abort();
498+
499+
} catch (Exception e) {
500+
throw new RuntimeException(e);
501+
} finally {
502+
if (null != statement) {
503+
statement.close();
504+
}
505+
if (null != connection) {
506+
connection.close();
507+
}
508+
}
509+
}
510+
511+
};
512+
513+
public static void run() {
514+
System.out.println("P2PModeAbortTest start");
515+
ObDirectLoadConnection connection = null;
516+
ObDirectLoadStatement statement = null;
517+
try {
518+
prepareTestTable();
519+
520+
connection = buildConnection(1);
521+
statement = buildStatement(connection, NodeRole.P2P);
522+
523+
statement.begin();
524+
525+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
526+
ObObj[] rowObjs = new ObObj[2];
527+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
528+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
529+
bucket.addRow(rowObjs);
530+
statement.write(bucket);
531+
532+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
533+
byte[] executionIdBytes = executionId.encode();
534+
535+
AbortP2PNode abortP2PNode = new AbortP2PNode(executionIdBytes, 3);
536+
Thread abortNodeThread = new Thread(abortP2PNode);
537+
abortNodeThread.start();
538+
abortNodeThread.join();
539+
540+
queryTestTable(0);
541+
542+
} catch (Exception e) {
543+
throw new RuntimeException(e);
544+
} finally {
545+
if (null != statement) {
546+
statement.close();
547+
}
548+
if (null != connection) {
549+
connection.close();
550+
}
551+
}
552+
System.out.println("P2PModeAbortTest successful");
553+
}
554+
555+
};
556+
316557
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadTimeoutException;
2626
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadUnexpectedException;
2727
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutionId;
28+
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutor.NodeRole;
2829
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutor;
2930
import com.alipay.oceanbase.rpc.direct_load.future.ObDirectLoadStatementFailedFuture;
3031
import com.alipay.oceanbase.rpc.direct_load.future.ObDirectLoadStatementFuture;
@@ -87,7 +88,7 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
8788
connection.getProtocol().checkIsSupported(this);
8889
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
8990
obTablePool.init();
90-
executor = new ObDirectLoadStatementExecutor(this);
91+
executor = new ObDirectLoadStatementExecutor(this, builder.nodeRole);
9192
if (builder.executionId != null) {
9293
executor.resume(builder.executionId);
9394
}
@@ -308,6 +309,10 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
308309
executor.resume(executionId);
309310
}
310311

312+
public void abort() throws ObDirectLoadException {
313+
executor.abort();
314+
}
315+
311316
public static final class Builder {
312317

313318
private final ObDirectLoadConnection connection;
@@ -327,6 +332,7 @@ public static final class Builder {
327332
private ObDirectLoadStatementExecutionId executionId = null;
328333

329334
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
335+
private NodeRole nodeRole = NodeRole.PRIMARY;
330336

331337
Builder(ObDirectLoadConnection connection) {
332338
this.connection = connection;
@@ -382,6 +388,11 @@ public ObDirectLoadTraceId getTraceId() {
382388
return traceId;
383389
}
384390

391+
public Builder setNodeRole(NodeRole nodeRole) {
392+
this.nodeRole = nodeRole;
393+
return this;
394+
}
395+
385396
public String toString() {
386397
return String
387398
.format(

0 commit comments

Comments
 (0)