Skip to content

Commit 3eacf70

Browse files
authored
handle errors on local particle call (#25)
1 parent c10cd19 commit 3eacf70

File tree

6 files changed

+65
-9
lines changed

6 files changed

+65
-9
lines changed

src/__test__/integration/builtins.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ describe('Builtins usage suite', () => {
7171

7272
let base64 = 'MjNy';
7373

74-
await uploadModule(client, 'test_broken_module', base64, config);
74+
await uploadModule(client, 'test_broken_module', base64, config, 10000);
7575
});
7676

7777
it('add_blueprint', async function () {
@@ -101,9 +101,9 @@ describe('Builtins usage suite', () => {
101101
let buf = Buffer.from(key);
102102

103103
let r = Math.random().toString(36).substring(7);
104-
await addProvider(client, buf, dev2peerId, r);
104+
await addProvider(client, buf, dev2peerId, r, undefined, 10000);
105105

106-
let pr = await getProviders(client, buf);
106+
let pr = await getProviders(client, buf, undefined, 10000);
107107
console.log(pr);
108108
console.log(r);
109109
expect(r).toEqual(pr[0][0].service_id);

src/__test__/unit/air.spec.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,30 @@ describe('== AIR suite', () => {
4444
expect(res).toEqual(arg);
4545
});
4646

47+
it('call broken script', async function () {
48+
const client = await createLocalClient();
49+
50+
const script = `(htyth)`;
51+
52+
await expect(client.sendScript(script)).rejects.toContain("aqua script can't be parsed");
53+
});
54+
55+
it('call script without ttl', async function () {
56+
const client = await createLocalClient();
57+
58+
const script = `(call %init_peer_id% ("" "") [""])`;
59+
60+
await expect(client.sendScript(script, undefined, 1)).rejects.toContain("Particle expired");
61+
});
62+
63+
it.skip('call broken script by fetch', async function () {
64+
const client = await createLocalClient();
65+
66+
const script = `(htyth)`;
67+
68+
await expect(client.fetch(script, ['result'])).rejects.toContain("aqua script can't be parsed");
69+
});
70+
4771
it('check particle arguments', async function () {
4872
// arrange
4973
const serviceId = 'test_service';

src/internal/FluenceClientBase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export abstract class FluenceClientBase {
8989

9090
async sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string> {
9191
const particle = await build(this.selfPeerIdFull, script, data, ttl);
92-
this.processor.executeLocalParticle(particle);
92+
await this.processor.executeLocalParticle(particle);
9393
return particle.id;
9494
}
9595
}

src/internal/FluenceClientImpl.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ export class FluenceClientImpl extends FluenceClientBase implements FluenceClien
7373
script = wrapFetchCall(script, callBackId, resultArgNames);
7474
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);
7575

76-
return new Promise<T>((resolve, reject) => {
76+
const prFetch = new Promise<T>(async (resolve, reject) => {
7777
this.fetchParticles.set(callBackId, { resolve, reject });
78-
this.processor.executeLocalParticle(particle);
7978
});
79+
const prExec = this.processor.executeLocalParticle(particle);
80+
return prExec.then(() => prFetch);
8081
}
8182

8283
// TODO:: better naming probably?

src/internal/ParticleProcessor.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,15 @@ export class ParticleProcessor {
7979

8080
async executeLocalParticle(particle: ParticleDto) {
8181
this.strategy?.onLocalParticleRecieved(particle);
82-
await this.handleParticle(particle).catch((err) => {
83-
log.error('particle processing failed: ' + err);
82+
return new Promise((resolve, reject) => {
83+
const resolveCallback = function () {
84+
resolve()
85+
}
86+
const rejectCallback = function (err: any) {
87+
reject(err)
88+
}
89+
// we check by callbacks that the script passed through the interpreter without errors
90+
this.handleParticle(particle, resolveCallback, rejectCallback)
8491
});
8592
}
8693

@@ -143,8 +150,10 @@ export class ParticleProcessor {
143150

144151
/**
145152
* Pass a particle to a interpreter and send a result to other services.
153+
* `resolve` will be completed if ret_code equals 0
154+
* `reject` will be completed if ret_code not equals 0
146155
*/
147-
private async handleParticle(particle: ParticleDto): Promise<void> {
156+
private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise<void> {
148157
// if a current particle is processing, add new particle to the queue
149158
if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) {
150159
this.enqueueParticle(particle);
@@ -160,6 +169,7 @@ export class ParticleProcessor {
160169
let actualTtl = particle.timestamp + particle.ttl - now;
161170
if (actualTtl <= 0) {
162171
this.strategy?.onParticleTimeout(particle, now);
172+
if (reject) reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`)
163173
} else {
164174
// if there is no subscription yet, previous data is empty
165175
let prevData: Uint8Array = Buffer.from([]);
@@ -191,6 +201,26 @@ export class ParticleProcessor {
191201
if (stepperOutcome.next_peer_pks.length > 0) {
192202
this.strategy.sendParticleFurther(newParticle);
193203
}
204+
205+
if (stepperOutcome.ret_code == 0) {
206+
if (resolve) {
207+
resolve()
208+
}
209+
} else {
210+
const error = stepperOutcome.error_message;
211+
if (reject) {
212+
reject(error);
213+
} else {
214+
log.error("Unhandled error: ", error);
215+
}
216+
}
217+
}
218+
} catch (e) {
219+
if (reject) {
220+
reject(e);
221+
} else {
222+
log.error("Unhandled error: ", e)
223+
throw e;
194224
}
195225
} finally {
196226
// get last particle from the queue

src/internal/commonTypes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export interface StepperOutcome {
3030
ret_code: number;
3131
data: Uint8Array;
3232
next_peer_pks: string[];
33+
error_message: string;
3334
}
3435

3536
export interface ResolvedTriplet {

0 commit comments

Comments
 (0)