Skip to content

Commit

Permalink
Added ability to bulk send
Browse files Browse the repository at this point in the history
  • Loading branch information
mligtenberg committed Feb 2, 2025
1 parent 619a3ba commit 338b152
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,20 @@
</div>
}
</p-dialog>

<p-dialog
header="Send messages"
appendTo="body"
[(visible)]="displaySendMessages"
[modal]="true"
[style]="{ width: '50%' }"
>
<p-scroll-panel>
<sbb-tpl-endpoint-selector-tree-input
[(ngModel)]="sendEndpoint"
/>
</p-scroll-panel>
<div class="actions">
<button pButton type="button" [disabled]="!sendEndpoint()" (click)="sendMessages()">Send</button>
</div>
</p-dialog>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Component, computed, inject, model, signal } from '@angular/core';
import { Component, computed, effect, inject, model, signal } from '@angular/core';
import { CommonModule } from '@angular/common';
import { Store } from '@ngrx/store';
import { MessagesSelectors } from '@service-bus-browser/messages-store';
import { MessagesActions, MessagesSelectors } from '@service-bus-browser/messages-store';
import { ActivatedRoute, Router } from '@angular/router';
import { switchMap } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
Expand All @@ -10,12 +10,15 @@ import { Card } from 'primeng/card';
import { TableModule } from 'primeng/table';
import { FormsModule } from '@angular/forms';
import { Dialog } from 'primeng/dialog';
import { Button } from 'primeng/button';
import { Button, ButtonDirective } from 'primeng/button';
import { EditorComponent } from 'ngx-monaco-editor-v2';
import { ColorThemeService } from '@service-bus-browser/services';
import { MenuItem } from 'primeng/api';
import { ContextMenu } from 'primeng/contextmenu';
import { BASE_ROUTE } from '../const';
import { ScrollPanel } from 'primeng/scrollpanel';
import { EndpointSelectorTreeInputComponent } from '@service-bus-browser/topology-components';
import { SendEndpoint } from '@service-bus-browser/service-bus-contracts';

@Component({
selector: 'lib-messages-page',
Expand All @@ -28,19 +31,26 @@ import { BASE_ROUTE } from '../const';
Button,
EditorComponent,
ContextMenu,
ScrollPanel,
EndpointSelectorTreeInputComponent,
ButtonDirective,
],
templateUrl: './messages-page.component.html',
styleUrl: './messages-page.component.scss',
})
export class MessagesPageComponent {
activatedRoute = inject(ActivatedRoute);
store = inject(Store);
displayBodyFullscreen = model<boolean>(false);
router = inject(Router);
baseRoute = inject(BASE_ROUTE);

displayBodyFullscreen = model<boolean>(false);
displaySendMessages = model<boolean>(false);
sendEndpoint = model<SendEndpoint | null>(null);
currentPage = signal<MessagePage | null>(null);
selection = model<ServiceBusReceivedMessage | ServiceBusReceivedMessage[] | undefined>(undefined);
selection = model<
ServiceBusReceivedMessage | ServiceBusReceivedMessage[] | undefined
>(undefined);
selectedMessage = computed(() => {
const selection = this.selection();
if (Array.isArray(selection) && selection.length === 1) {
Expand Down Expand Up @@ -155,17 +165,23 @@ export class MessagesPageComponent {
return [];
}

if (Array.isArray(contextMenuSelection) && contextMenuSelection.length === 0) {
if (
Array.isArray(contextMenuSelection) &&
contextMenuSelection.length === 0
) {
return [];
}

if (Array.isArray(contextMenuSelection) && contextMenuSelection.length > 1) {
if (
Array.isArray(contextMenuSelection) &&
contextMenuSelection.length > 1
) {
return [
{
label: 'Resend messages',
icon: 'pi pi-envelope',
command: () => {
console.log('Resend messages', contextMenuSelection);
this.displaySendMessages.set(true);
},
},
];
Expand All @@ -180,7 +196,12 @@ export class MessagesPageComponent {
label: 'Resend message',
icon: 'pi pi-envelope',
command: () => {
this.router.navigate([this.baseRoute, 'resend', this.currentPage()!.id, selectedMessage!.messageId])
this.router.navigate([
this.baseRoute,
'resend',
this.currentPage()!.id,
selectedMessage!.messageId,
]);
},
},
];
Expand All @@ -200,4 +221,21 @@ export class MessagesPageComponent {
this.currentPage.set(page ?? null);
});
}

sendMessages() {
const endpoint = this.sendEndpoint();
const messages = this.selection();
if (!endpoint || !messages || !Array.isArray(messages) || messages.length === 0) {
console.error('Invalid endpoint or messages');
return;
}

this.displaySendMessages.set(false);
this.store.dispatch(
MessagesActions.sendMessages({
endpoint,
messages,
})
);
}
}
29 changes: 28 additions & 1 deletion libs/messages/store/src/lib/messages-tasks.effects.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { inject, Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { TasksActions } from '@service-bus-browser/tasks-store';
import { map } from 'rxjs';
import { map, mergeMap, tap } from 'rxjs';

import * as actions from './messages.actions';
import * as internalActions from './messages.internal-actions';
Expand Down Expand Up @@ -71,4 +71,31 @@ export class MessagesTasksEffects {
})
}),
));

messagesSending$ = createEffect(() => this.actions.pipe(
ofType(internalActions.messagesSending),
map(({ endpoint, messagesToSend, sendAmount, taskId }) => {
console.log('messagesSending', { endpoint, messagesToSend, sendAmount, taskId });
if (sendAmount > 0) {
return TasksActions.setProgress({
id: taskId,
statusDescription: `${sendAmount}/${messagesToSend.length}`,
progress: sendAmount / (sendAmount + messagesToSend.length) * 100
});
}

return TasksActions.createTask({
id: taskId,
statusDescription: `0/${messagesToSend.length}`,
description: `sending messages to ${'queueName' in endpoint ? endpoint.queueName : endpoint.topicName}`,
hasProgress: true,
initialProgress: 0
});
})
), { dispatch: false });

messagesSendSucceeded$ = createEffect(() => this.actions.pipe(
ofType(internalActions.messagesSendSucceeded),
map(({ taskId }) => TasksActions.completeTask({ id: taskId }))
), { dispatch: false });
}
8 changes: 8 additions & 0 deletions libs/messages/store/src/lib/messages.actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ export const sendMessage = createAction(
message: ServiceBusMessage
}>()
)

export const sendMessages = createAction(
'[Messages] send messages',
props<{
endpoint: SendEndpoint,
messages: ServiceBusMessage[]
}>()
)
36 changes: 35 additions & 1 deletion libs/messages/store/src/lib/messages.effects.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { inject, Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { ServiceBusMessagesElectronClient } from '@service-bus-browser/service-bus-electron-client';
import { catchError, from, map, mergeMap } from 'rxjs';
import { catchError, from, map, mergeMap, tap } from 'rxjs';

import * as actions from './messages.actions';
import * as internalActions from './messages.internal-actions';
Expand Down Expand Up @@ -93,4 +93,38 @@ export class MessagesEffects {
catchError(() => [internalActions.messageSendFailed({ endpoint, message })])
))
));

sendMessages$ = createEffect(() => this.actions.pipe(
ofType(actions.sendMessages),
map(({ endpoint, messages }) => {
return internalActions.messagesSending({
taskId: crypto.randomUUID(),
sendAmount: 0,
messagesToSend: messages,
endpoint
})
})
));

continueSendingMessages$ = createEffect(() => this.actions.pipe(
ofType(internalActions.messagesSending),
mergeMap(({ taskId, endpoint, messagesToSend, sendAmount }) => {
const message = messagesToSend[0];
const rest = messagesToSend.slice(1);

return from(this.messagesService.sendMessage(endpoint, message))
.pipe(
map(() => rest.length === 0
? internalActions.messagesSendSucceeded({ taskId })
: internalActions.messagesSending({
taskId,
endpoint,
messagesToSend: rest,
sendAmount: sendAmount + 1
})
),
catchError(() => [internalActions.messageSendFailed({ endpoint, message })])
);
})
));
}
27 changes: 27 additions & 0 deletions libs/messages/store/src/lib/messages.internal-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,30 @@ export const messageSendFailed = createAction(
message: ServiceBusMessage
}>()
)

export const messagesSendSucceeded = createAction(
'[Messages] message send succeeded',
props<{
taskId: UUID,
}>()
)

export const messagesSending = createAction(
'[Messages] message sending',
props<{
taskId: UUID,
endpoint: SendEndpoint,
messagesToSend: ServiceBusMessage[],
sendAmount: number
}>()
)

export const messagesSendFailed = createAction(
'[Messages] message send failed',
props<{
taskId: UUID,
endpoint: SendEndpoint,
messagesToSend: ServiceBusMessage[],
sendAmount: number
}>()
)
1 change: 1 addition & 0 deletions libs/topology/components/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './lib/topology-tree/topology-tree.component';
export * from './lib/endpoint-selector-input/endpoint-selector-input.component';
export * from './lib/endpoint-string-selector-input/endpoint-string-selector-input.component';
export * from './lib/endpoint-selector-tree-input/endpoint-selector-tree-input.component';
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
[modal]="true"
>
<p-scroll-panel>
<sbb-tpl-topology-tree
[namespaces]="namespaces()"
[displaySubscriptions]="false"
<sbb-tpl-endpoint-selector-tree-input
[(ngModel)]="value"
(ngModelChange)="dialogVisible.set(false)"
[connectionsFilter]="connectionsFilter()"
(queueSelected)="onQueueSelected($event)"
(topicSelected)="onTopicSelected($event)"
></sbb-tpl-topology-tree>
/>
</p-scroll-panel>
</p-dialog>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sbb-tpl-topology-tree {
sbb-tpl-endpoint-selector-tree-input {
display: block;
max-width: 80vw;
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { Component, effect, forwardRef, inject, input, signal } from '@angular/core';
import { Component, effect, forwardRef, inject, input, model, signal } from '@angular/core';
import { CommonModule } from '@angular/common';
import { InputGroup } from 'primeng/inputgroup';
import { Dialog } from 'primeng/dialog';
import { InputText } from 'primeng/inputtext';
import { Button } from 'primeng/button';
import { Store } from '@ngrx/store';
import { TopologySelectors } from '@service-bus-browser/topology-store';
import { TopologyTreeComponent } from '../topology-tree/topology-tree.component';
import { ScrollPanel } from 'primeng/scrollpanel';
import { QueueWithMetaData, TopicWithMetaData } from '@service-bus-browser/topology-contracts';
import { InputGroupAddon } from 'primeng/inputgroupaddon';
import { ControlValueAccessor, NG_VALUE_ACCESSOR } from '@angular/forms';
import { ControlValueAccessor, FormsModule, NG_VALUE_ACCESSOR } from '@angular/forms';
import { SendEndpoint } from '@service-bus-browser/service-bus-contracts';
import {
EndpointSelectorTreeInputComponent
} from '../endpoint-selector-tree-input/endpoint-selector-tree-input.component';

@Component({
selector: 'sbb-tpl-endpoint-selector-input',
Expand All @@ -21,9 +22,10 @@ import { SendEndpoint } from '@service-bus-browser/service-bus-contracts';
Dialog,
InputText,
Button,
TopologyTreeComponent,
ScrollPanel,
InputGroupAddon,
EndpointSelectorTreeInputComponent,
FormsModule,
],
providers: [
{
Expand All @@ -43,7 +45,7 @@ export class EndpointSelectorInputComponent implements ControlValueAccessor {
disabled = signal(false);
dialogVisible = signal(false);
namespaces = this.store.selectSignal(TopologySelectors.selectNamespaces);
value = signal<SendEndpoint | null>(null);
value = model<SendEndpoint | null>(null);

connectionsFilter = input<string[]>();

Expand All @@ -55,25 +57,6 @@ export class EndpointSelectorInputComponent implements ControlValueAccessor {
});
}

onQueueSelected($event: { namespaceId: string; queue: QueueWithMetaData }) {
this.value.set({
endpoint: $event.queue.metadata.endpoint,
queueName: $event.queue.name,
connectionId: $event.queue.namespaceId,

});
this.dialogVisible.set(false);
}

onTopicSelected($event: { namespaceId: string; topic: TopicWithMetaData }) {
this.value.set({
endpoint: $event.topic.metadata.endpoint,
topicName: $event.topic.name,
connectionId: $event.topic.namespaceId,
});
this.dialogVisible.set(false);
}

writeValue(obj: SendEndpoint | null): void {
this.value.set(obj);
}
Expand All @@ -85,6 +68,7 @@ export class EndpointSelectorInputComponent implements ControlValueAccessor {
registerOnTouched(fn: () => void): void {
this.onTouched = fn;
}

setDisabledState?(isDisabled: boolean): void {
this.disabled.set(isDisabled);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<sbb-tpl-topology-tree
[namespaces]="namespaces()"
[displaySubscriptions]="false"
[connectionsFilter]="connectionsFilter()"
(queueSelected)="onQueueSelected($event)"
(topicSelected)="onTopicSelected($event)"
></sbb-tpl-topology-tree>
Loading

0 comments on commit 338b152

Please sign in to comment.