Skip to content

Commit

Permalink
data preparation improvements, tekton improvement, initial kubevirt s…
Browse files Browse the repository at this point in the history
…upport
  • Loading branch information
sebt3 committed Apr 18, 2024
1 parent fda2cd0 commit 0f14336
Show file tree
Hide file tree
Showing 47 changed files with 110,676 additions and 20,447 deletions.
20 changes: 14 additions & 6 deletions back/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ApolloServerPluginLandingPageLocalDefault, ApolloServerPluginLandingPag
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { parse } from 'url';
import { useServer } from 'graphql-ws/lib/use/ws';
import express from 'express';
import http from 'http';
Expand Down Expand Up @@ -47,28 +48,35 @@ const typeDefs = gqlWrapper(
importGraphQL('min.graphql'),
importGraphQL('operators.graphql'),
importGraphQL('whereabouts.graphql'),
importGraphQL('kubevirt.graphql'),
importGraphQL('networkaddonsoperator.graphql'),
);

interface MyContext {
token?: String;
}
export const app = express();
export const httpServer = http.createServer(app);
/*const wsServer = new WebSocketServer({
server: httpServer,
path: '/subscriptions',
const wsServer = new WebSocketServer({ noServer: true });
httpServer.on('upgrade', function upgrade(request, socket, head) {
const { pathname } = parse(request.url as string);
if (pathname === '/suscrptions') {
wsServer.handleUpgrade(request, socket, head, function done(ws) {
wsServer.emit('connection', ws, request);
});
}
});
const schema = makeExecutableSchema({ typeDefs, resolvers });
const serverCleanup = useServer({ schema }, wsServer);*/
const apolloPlugins = [ApolloServerPluginDrainHttpServer({ httpServer }),/*{
const serverCleanup = useServer({ schema }, wsServer);
const apolloPlugins = [ApolloServerPluginDrainHttpServer({ httpServer }),{
async serverWillStart() {
return {
async drainServer() {
await serverCleanup.dispose();
},
};
},
}*/]
}]
if (gramoConfig.enableGraphQLClient||process.env.NODE_ENV !== 'production')
apolloPlugins.push(ApolloServerPluginLandingPageLocalDefault());
else
Expand Down
63 changes: 63 additions & 0 deletions back/pubsub/logpubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {PubSubEngine} from 'graphql-subscriptions';
import {PubSubAsyncIterator} from './pubsubAsyncIterator.js';
type OnMessage<T> = (message: T) => void;

export interface LogPubSubOptions {

}


export class LogPubSub implements PubSubEngine {
private readonly subscriptionMap: { [subId: number]: [string, OnMessage<any>] };
private readonly subsRefsMap: Map<string, Set<number>>;
private currentSubscriptionId: number;

constructor(options:LogPubSubOptions) {
this.subscriptionMap = {};
this.subsRefsMap = new Map<string, Set<number>>();
this.currentSubscriptionId = 0;
console.log('LogPubSub.constructor', options)
}

public async publish<T>(trigger: string, payload: T): Promise<void> {
//await this.redisPublisher.publish(trigger, this.serializer ? this.serializer(payload) : JSON.stringify(payload));
console.log('TODO', 'publish', trigger, payload)
return Promise.resolve();
}
public asyncIterator<T>(triggers: string | string[], options?: object): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers, options);
}

public subscribe<T = any>(triggerName: string, onMessage: OnMessage<T>, options?: Object): Promise<number> {
//const [namespace, pod_name, name] = triggerName.split('|');
const id = this.currentSubscriptionId++;
this.subscriptionMap[id] = [triggerName, onMessage];
if (!this.subsRefsMap.has(triggerName)) {
this.subsRefsMap.set(triggerName, new Set());
}
const refs = this.subsRefsMap.get(triggerName);
console.log('LogPubSub.subscribe', triggerName, options, id, this.subscriptionMap, this.subsRefsMap)
if (refs != undefined && refs.size > 0) {
refs.add(id);
return Promise.resolve(id);
} else {
return new Promise<number>((resolve, reject) => {
console.log('TODO', 'Create the source for', triggerName, reject)
resolve(id);
})
}
}
public unsubscribe(subId: number) {
const [triggerName = null] = this.subscriptionMap[subId] || [];
if (triggerName == null) throw new Error(`There is no subscription of id "${subId}"`);
const refs = this.subsRefsMap.get(triggerName);
if (!refs) throw new Error(`There is no subscription of id "${subId}"`);
if (refs.size === 1) {
console.log('TODO', 'Deleting stream source for', triggerName)
this.subsRefsMap.delete(triggerName);
} else {
refs.delete(subId);
}
delete this.subscriptionMap[subId];
}
}
76 changes: 76 additions & 0 deletions back/pubsub/pubsubAsyncIterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import {PubSubEngine} from 'graphql-subscriptions';
export class PubSubAsyncIterator<T> implements AsyncIterableIterator<T> {
constructor(pubsub: PubSubEngine, eventNames: string | string[], options?: object) {
this.pubsub = pubsub;
this.options = options;
this.pullQueue = [];
this.pushQueue = [];
this.listening = true;
this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames;
}

public async next() {
await this.subscribeAll();
return this.listening ? this.pullValue() : this.return();
}
public async return(): Promise<{ value: unknown, done: true }> {
await this.emptyQueue();
return { value: undefined, done: true };
}
public async throw(error): Promise<never> {
await this.emptyQueue();
return Promise.reject(error);
}
public [Symbol.asyncIterator]() {
return this;
}
private pullQueue: Array<(data: { value: unknown, done: boolean }) => void>;
private pushQueue: any[];
private eventsArray: string[];
private subscriptionIds: Promise<number[]> | undefined;
private listening: boolean;
private pubsub: PubSubEngine;
private options: object|undefined;
private async pushValue(event) {
await this.subscribeAll();
if (this.pullQueue.length !== 0) {
const current = this.pullQueue.shift();
if (current!=undefined)
current({ value: event, done: false });
} else {
this.pushQueue.push(event);
}
}
private pullValue(): Promise<IteratorResult<any>> {
return new Promise(resolve => {
if (this.pushQueue.length !== 0) {
resolve({ value: this.pushQueue.shift(), done: false });
} else {
this.pullQueue.push(resolve);
}
});
}
private async emptyQueue() {
if (this.listening) {
this.listening = false;
if (this.subscriptionIds) this.unsubscribeAll(await this.subscriptionIds);
this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
this.pullQueue.length = 0;
this.pushQueue.length = 0;
}
}
private subscribeAll() {
if (!this.subscriptionIds) {
this.subscriptionIds = Promise.all(this.eventsArray.map(
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), this.options||{}),
));
}
return this.subscriptionIds;
}

private unsubscribeAll(subscriptionIds: number[]) {
for (const subscriptionId of subscriptionIds) {
this.pubsub.unsubscribe(subscriptionId);
}
}
}
Loading

0 comments on commit 0f14336

Please sign in to comment.