Skip to content

Commit 0193b4c

Browse files
add elearning community example
1 parent 13b0fd1 commit 0193b4c

File tree

13 files changed

+1646
-3
lines changed

13 files changed

+1646
-3
lines changed

cdr-aggregation.md

+104-3
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,18 @@ stateDiagram-v2
117117
Received --> Validating: Webhook triggers function
118118
Validating --> IdempotencyChecking: Validation successful
119119
Validating --> Rejected: Validation failed
120+
120121
IdempotencyChecking --> Journaling: New CDR
121-
IdempotencyChecking --> Skipped: Duplicate CDR
122-
Journaling --> Aggregating: Journal stored
122+
IdempotencyChecking --> Skipped: Exact duplicate CDR
123+
124+
Journaling --> VersionDetection: Journal stored
125+
VersionDetection --> Aggregating: No version change
126+
VersionDetection --> PreviousVersionRetrieval: New version detected
127+
128+
PreviousVersionRetrieval --> DeltaComputation: Get previous version from blob
129+
DeltaComputation --> AggregateRecomputation: Calculate change delta
130+
AggregateRecomputation --> Aggregating: Apply delta to aggregates
131+
123132
Aggregating --> Completed: Aggregate updated
124133
Aggregating --> Failed: Error updating aggregate
125134
Failed --> Retry: After delay
@@ -129,7 +138,99 @@ stateDiagram-v2
129138
Completed --> [*]
130139
```
131140

132-
*This state diagram represents the processing flow of a CDR through the system, showing the validation, idempotency check, journaling, and aggregation steps.*
141+
*This state diagram represents the processing flow of a CDR through the system, showing the validation, idempotency check, journaling, and aggregation steps. The diagram includes an alternative path for version handling when updated CDR versions are received. For clarity, version detection occurs after the journal storage to ensure data is safely stored before any further processing.*
142+
143+
#### Version-Based Delta Computation
144+
145+
For clarity, we've added an alternative flow path to handle cases where an updated version of the same CDR is received. This is an important consideration because TandemDrive may send updated versions of previously submitted CDRs when corrections or adjustments are made to charging sessions.
146+
147+
The system uses Azure Blob Storage's built-in versioning capabilities to manage this scenario efficiently:
148+
149+
1. **Version Detection**: When a CDR with the same ID as an existing record is received, the system compares metadata to determine if it's a new version
150+
2. **Previous Version Retrieval**: If it's a new version, the system retrieves the previous version from blob storage
151+
3. **Delta Computation**: The system calculates the differences between versions (e.g., changes in energy, duration, or cost)
152+
4. **Aggregate Recomputation**: The system applies only the delta changes to the aggregates to avoid double-counting
153+
154+
This approach ensures that aggregates remain accurate when CDR data is updated without having to rebuild the entire aggregate from scratch.
155+
156+
##### Implementation Approach (Pseudocode)
157+
158+
```csharp
159+
// Pseudocode for version-based delta computation
160+
public async Task ProcessCDRWithVersioning(CDR newCdrVersion)
161+
{
162+
// Check if previous version exists
163+
if (await _blobService.ExistsAsync(newCdrVersion.CdrId))
164+
{
165+
// Get metadata to determine version
166+
var metadata = await _blobService.GetMetadataAsync(newCdrVersion.CdrId);
167+
168+
if (IsDifferentVersion(metadata, newCdrVersion))
169+
{
170+
// Get previous version from blob storage using versioning API
171+
var previousVersions = await _blobService.ListBlobVersionsAsync(newCdrVersion.CdrId);
172+
var latestPreviousVersion = previousVersions.OrderByDescending(v => v.VersionId).First();
173+
var previousCdr = await _blobService.GetBlobContentAsync(newCdrVersion.CdrId, latestPreviousVersion.VersionId);
174+
175+
// Deserialize previous version
176+
var previousCdrData = JsonSerializer.Deserialize<CDR>(previousCdr);
177+
178+
// Calculate delta between versions
179+
var energyDelta = newCdrVersion.Energy - previousCdrData.Energy;
180+
var durationDelta = newCdrVersion.Duration - previousCdrData.Duration;
181+
var costDelta = newCdrVersion.Cost - previousCdrData.Cost;
182+
183+
// Get existing aggregate
184+
var dateKey = newCdrVersion.Timestamp.ToString("yyyy-MM-dd");
185+
var monthKey = newCdrVersion.Timestamp.ToString("yyyy-MM");
186+
187+
var dailyAggregate = await _tableStorage.GetAggregateAsync(dateKey, newCdrVersion.EvseId);
188+
var monthlyAggregate = await _tableStorage.GetAggregateAsync(monthKey, newCdrVersion.EvseId);
189+
190+
// Apply deltas to aggregates
191+
dailyAggregate.TotalEnergy += energyDelta;
192+
dailyAggregate.TotalDuration += durationDelta;
193+
dailyAggregate.TotalCost += costDelta;
194+
// SessionCount remains unchanged since it's the same session
195+
196+
monthlyAggregate.TotalEnergy += energyDelta;
197+
monthlyAggregate.TotalDuration += durationDelta;
198+
monthlyAggregate.TotalCost += costDelta;
199+
200+
// Store updated aggregates
201+
await _tableStorage.UpsertAggregateAsync(dailyAggregate);
202+
await _tableStorage.UpsertAggregateAsync(monthlyAggregate);
203+
204+
// Store new version in blob with versioning enabled
205+
await _blobService.StoreWithVersioningAsync(newCdrVersion);
206+
207+
_logger.LogInformation(
208+
"Updated CDR version processed. ID: {CdrId}, Energy delta: {EnergyDelta}, Cost delta: {CostDelta}",
209+
newCdrVersion.CdrId, energyDelta, costDelta);
210+
211+
return;
212+
}
213+
}
214+
215+
// If not a version update, proceed with normal processing flow...
216+
await ProcessNormalCDR(newCdrVersion);
217+
}
218+
219+
private bool IsDifferentVersion(BlobMetadata metadata, CDR newCdr)
220+
{
221+
// Check if this is a different version based on metadata
222+
// For example, compare version numbers or timestamps
223+
return metadata.ContainsKey("version") &&
224+
metadata["version"] != newCdr.Version;
225+
}
226+
```
227+
228+
This versioning approach provides several benefits:
229+
1. **Accuracy**: Ensures aggregates reflect the most current data
230+
2. **Efficiency**: Avoids reprocessing all historical data when a single record changes
231+
3. **Auditability**: Preserves all versions of CDRs for compliance and reconciliation
232+
4. **Cost-Effective**: Leverages built-in Azure Blob Storage versioning rather than custom solutions
233+
5. **Idempotency**: Maintains the system's idempotent processing characteristics
133234

134235
### Data Model
135236
Key entities in the system include:

compare.md

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Overview
2+
NOTE: This process supersedes the ([Greenflux CDRs Import Process](https://dev.azure.com/enecomanagedcloud/eMobility/_wiki/wikis/eMobility.wiki/49305/Greenflux-CDRs-Import-process)).
3+
4+
This system is responsible for importing priced charge sessions (CDRs) from TandemDrive external system. The priced charge sessions support `Dynamic electricity tariffs` - instead of paying a fixed price throughout the annual contract, a customer pays a different rate, which typically varies by the hour. This pricing model helps customers reduce energy costs by charging during cheaper periods.
5+
6+
This import is essential for quick insights and usage analysis (reporting in the Eneco eMobility Portal). It also supports exporting customer-specific data for invoice validation and transparency.
7+
8+
## Data capacity
9+
We expect a similar volume of data and measurements. While the data model differs, the core information, such as energy consumption and charging metrics, remains the same. The goal is to maintain continuity in reporting and analytics - the output should be functionally equivalent.
10+
11+
## Architecture
12+
13+
This system is designed as a `Real-time processing of big data in motion` architecture. While the data volume does not strictly qualify as Big Data, it is still big enough to require a scalable approach.
14+
15+
### Preferred Approach - Azure Storage
16+
This architecture needs to structure blob containers in partitioned way and transform data to Parquet format and it does require CosmosDB for the read part.
17+
18+
![TandemDrive-CDRs-Import-Simplified.drawio.png](/.attachments/TandemDrive-CDRs-Import-Simplified.drawio-2f97d6a1-43f6-499f-83bb-7027d98c6e71.png)
19+
20+
- TandemDrive sends CDRs to Azure API Management via webhook.
21+
- APIM pushes the data directly into Azure Blob Storage without a backend.
22+
- Azure Storage holds the raw data, organized for easy access by date or ID.
23+
- Azure Synapse runs SQL-like queries on the blob data using serverless pools.
24+
- Backend APIs invoke queries using Synapse, and sends results to the Web Portal.
25+
26+
Blob container should be organized as shown below:
27+
```bash
28+
/cdrs/year=2025/month=04/day=22/locationId=LOC123/evseId=EVSE456/file.parquet
29+
```
30+
31+
### Alternate Approach - Event Hubs
32+
The overall design resembles a `Kappa architecture`, focusing on stream processing without the need for separate batch and real-time layers.
33+
34+
![HLA-TandemDrive-CDRs-Import-Process.drawio.png](/.attachments/HLA-TandemDrive-CDRs-Import-Process.drawio-379e5d9d-9f7b-4519-b1d6-2044a18e3844.png)
35+
36+
- The system ingests CDRs via Azure API Management (APIM), which exposes a webhook. APIM sends data directly to Event Hubs, with no backend in between.
37+
- Event Hubs acts as the streaming buffer, supporting replay/reprocessing (allows to re-compute events when fixing bugs or recomputing aggregations)
38+
- Event Hubs Capture stores all incoming data in Azure Storage automatically, so no custom logic is needed. By default, it stores data in Avro format, optimized for cost efficient long-term storage.
39+
- Consumers hosted in the App Service consume the stream events in parallel: one saves transformed lightweight CDRs, another one - computes daily summaries (aggregations).
40+
41+
### How we handle versioning
42+
43+
TandemDrive CDRs support versioning, where each update to a CDR includes a `version` field. When a new version arrives, the pipeline:
44+
- Overwrites the previous version in the CosmosDb by using cdr id and version.
45+
- Re-computes the daily summaries that include the updated CDR.
46+
47+
This design ensures correct results even if sessions are updated retroactively.
48+
49+
### How we handle failures
50+
Failures in ingestion and processing are mitigated through the following strategy:
51+
- Event replay. Event Hubs retains events for 1-7 days (Standard tier, up to 3 months Premium tier). If consumers fail or a bug is discovered, we can reprocess past data without re-ingestion.
52+
- Retry mechanism. All consumers implement exponential backoff retries to handle transient errors.
53+
- Consumers only commit their offset (checkpoint) after the event has been processed successfully. This guarantees that events are retried on restart or crash, and ensures consistency in case of transient failures.
54+
- Poison messages are not retried indefinitely. After the retry limit we forward the message to a custom Dead Letter Queue on Azure Service Bus.
55+
- Metrics and alerting. Application Insights tracks all failures, retries, and latency metrics. Alerts are configured to notify engineers in case of consistent processing failures or ingestion gaps.
56+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
@startuml C4_Component
2+
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Component.puml
3+
4+
LAYOUT_WITH_LEGEND()
5+
6+
title Component diagram for Learning Service
7+
8+
Container(webApp, "Web Application", "Blazor WASM", "Provides the user interface for all platform functionality")
9+
Container(graphqlApi, "GraphQL API", "Hot Chocolate, .NET 9", "Provides a unified GraphQL schema")
10+
ContainerDb(learningDb, "Learning Database", "SQL Server", "Stores learning content, paths, and modules")
11+
ContainerDb(userDb, "User Database", "SQL Server", "Stores user profiles and progress")
12+
System_Ext(youtubeSystem, "YouTube", "Video hosting platform")
13+
System_Ext(calendarSystem, "Calendar Service", "Manages event scheduling")
14+
Container(eventBus, "Event Bus", "Azure Service Bus", "Handles asynchronous communication between services")
15+
16+
Container_Boundary(learningService, "Learning Service") {
17+
Component(learningPathManager, "Learning Path Manager", ".NET 9 Service", "Manages creation and structure of learning paths")
18+
Component(moduleManager, "Module Manager", ".NET 9 Service", "Handles individual modules within learning paths")
19+
Component(contentManager, "Content Manager", ".NET 9 Service", "Manages educational content and resources")
20+
Component(calendarIntegration, "Calendar Integration", ".NET 9 Service", "Integrates with calendar services")
21+
Component(youtubeIntegration, "YouTube Integration", ".NET 9 Service", "Handles YouTube video embedding")
22+
Component(eventManager, "Event Manager", ".NET 9 Service", "Manages learning events and schedules")
23+
Component(enrollmentManager, "Enrollment Manager", ".NET 9 Service", "Handles user enrollment in learning paths")
24+
Component(progressTracker, "Progress Tracker", ".NET 9 Service", "Tracks user progress through learning paths")
25+
Component(learningRepository, "Learning Repository", "EF Core", "Provides data access for learning content")
26+
}
27+
28+
Rel(graphqlApi, learningPathManager, "Sends learning path operations to")
29+
Rel(graphqlApi, moduleManager, "Sends module operations to")
30+
Rel(graphqlApi, contentManager, "Sends content operations to")
31+
Rel(graphqlApi, eventManager, "Sends event operations to")
32+
Rel(graphqlApi, enrollmentManager, "Sends enrollment operations to")
33+
Rel(graphqlApi, progressTracker, "Sends progress queries to")
34+
35+
Rel(learningPathManager, moduleManager, "Uses")
36+
Rel(moduleManager, contentManager, "Uses")
37+
Rel(eventManager, calendarIntegration, "Uses")
38+
Rel(contentManager, youtubeIntegration, "Uses")
39+
40+
Rel(learningPathManager, learningRepository, "Uses")
41+
Rel(moduleManager, learningRepository, "Uses")
42+
Rel(contentManager, learningRepository, "Uses")
43+
Rel(eventManager, learningRepository, "Uses")
44+
Rel(enrollmentManager, learningRepository, "Uses")
45+
Rel(progressTracker, learningRepository, "Uses")
46+
47+
Rel(learningRepository, learningDb, "Reads from and writes to")
48+
Rel(enrollmentManager, userDb, "Reads from and writes to")
49+
Rel(progressTracker, userDb, "Reads from and writes to")
50+
51+
Rel(youtubeIntegration, youtubeSystem, "Embeds videos from", "HTTPS")
52+
Rel(calendarIntegration, calendarSystem, "Schedules events using", "HTTPS")
53+
54+
Rel(eventManager, eventBus, "Publishes learning events to")
55+
Rel(enrollmentManager, eventBus, "Publishes enrollment events to")
56+
57+
@enduml
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
@startuml C4_Container
2+
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Container.puml
3+
4+
LAYOUT_WITH_LEGEND()
5+
6+
title Container diagram for E-Learning Community Platform
7+
8+
Person(contentCreator, "Content Creator", "Creates learning paths, modules, and assigns reviewers")
9+
Person(learner, "Learner", "Follows learning paths, completes assignments, earns certifications")
10+
Person(reviewer, "Reviewer", "Reviews and grades assignments, provides feedback")
11+
Person(admin, "Administrator", "Manages platform settings and user access")
12+
13+
System_Ext(youtubeSystem, "YouTube", "Video hosting platform")
14+
System_Ext(calendarSystem, "Calendar Service", "Manages event scheduling")
15+
System_Ext(emailSystem, "Email Service", "Sends notifications and alerts")
16+
System_Ext(authSystem, "Authentication Provider", "Identity management")
17+
18+
System_Boundary(elearningSystem, "E-Learning Community Platform") {
19+
Container(webApp, "Web Application", "Blazor WASM, .NET 9", "Provides the user interface for all platform functionality")
20+
21+
Container(apiGateway, "API Gateway", "Azure API Management", "Routes requests and handles API management")
22+
23+
Container(graphqlApi, "GraphQL API", "Hot Chocolate, .NET 9", "Provides a unified GraphQL schema for the web application")
24+
25+
Container(learningService, "Learning Service", ".NET 9", "Manages learning paths, modules, and content")
26+
Container(assignmentService, "Assignment Service", ".NET 9", "Handles assignment submissions and reviews")
27+
Container(forumService, "Forum Service", ".NET 9", "Provides discussion forum capabilities")
28+
Container(certificationService, "Certification Service", ".NET 9", "Tracks progress and issues certifications")
29+
Container(notificationService, "Notification Service", ".NET 9", "Manages notifications and alerts")
30+
31+
ContainerDb(learningDb, "Learning Database", "SQL Server", "Stores learning content, paths, and modules")
32+
ContainerDb(assignmentDb, "Assignment Database", "SQL Server", "Stores assignments, submissions, and grades")
33+
ContainerDb(forumDb, "Forum Database", "SQL Server", "Stores forum posts and discussions")
34+
ContainerDb(userDb, "User Database", "SQL Server", "Stores user profiles and progress")
35+
36+
Container(fileStorage, "File Storage", "Azure Blob Storage", "Stores assignment files and resources")
37+
Container(eventBus, "Event Bus", "Azure Service Bus", "Handles asynchronous communication between services")
38+
}
39+
40+
Rel(contentCreator, webApp, "Uses", "HTTPS")
41+
Rel(learner, webApp, "Uses", "HTTPS")
42+
Rel(reviewer, webApp, "Uses", "HTTPS")
43+
Rel(admin, webApp, "Uses", "HTTPS")
44+
45+
Rel(webApp, graphqlApi, "Makes API calls to", "HTTPS/GraphQL")
46+
Rel(graphqlApi, apiGateway, "Routes requests through")
47+
48+
Rel(apiGateway, learningService, "Routes learning requests to")
49+
Rel(apiGateway, assignmentService, "Routes assignment requests to")
50+
Rel(apiGateway, forumService, "Routes forum requests to")
51+
Rel(apiGateway, certificationService, "Routes certification requests to")
52+
Rel(apiGateway, notificationService, "Routes notification requests to")
53+
54+
Rel(learningService, learningDb, "Reads from and writes to")
55+
Rel(assignmentService, assignmentDb, "Reads from and writes to")
56+
Rel(assignmentService, fileStorage, "Stores files in")
57+
Rel(forumService, forumDb, "Reads from and writes to")
58+
Rel(certificationService, learningDb, "Reads from")
59+
Rel(certificationService, assignmentDb, "Reads from")
60+
Rel(certificationService, userDb, "Reads from and writes to")
61+
Rel(notificationService, eventBus, "Publishes events to")
62+
63+
Rel(learningService, youtubeSystem, "Embeds videos from", "HTTPS")
64+
Rel(learningService, calendarSystem, "Schedules events using", "HTTPS")
65+
Rel(notificationService, emailSystem, "Sends emails through", "SMTP")
66+
Rel(webApp, authSystem, "Authenticates using", "OAuth 2.0")
67+
68+
Rel(eventBus, notificationService, "Triggers notifications in")
69+
70+
@enduml
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
@startuml C4_Context
2+
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml
3+
4+
LAYOUT_WITH_LEGEND()
5+
6+
title System Context diagram for E-Learning Community Platform
7+
8+
Person(contentCreator, "Content Creator", "Creates learning paths, modules, and assigns reviewers")
9+
Person(learner, "Learner", "Follows learning paths, completes assignments, earns certifications")
10+
Person(reviewer, "Reviewer", "Reviews and grades assignments, provides feedback")
11+
Person(admin, "Administrator", "Manages platform settings and user access")
12+
13+
System(elearningSystem, "E-Learning Community Platform", "Enables creation and consumption of educational content, assignments, and certification")
14+
15+
System_Ext(youtubeSystem, "YouTube", "Video hosting platform")
16+
System_Ext(calendarSystem, "Calendar Service", "Manages event scheduling")
17+
System_Ext(emailSystem, "Email Service", "Sends notifications and alerts")
18+
System_Ext(authSystem, "Authentication Provider", "Identity management")
19+
20+
Rel(contentCreator, elearningSystem, "Creates learning content and paths")
21+
Rel(learner, elearningSystem, "Follows learning paths, submits assignments")
22+
Rel(reviewer, elearningSystem, "Reviews and grades assignments")
23+
Rel(admin, elearningSystem, "Manages platform and users")
24+
25+
Rel(elearningSystem, youtubeSystem, "Embeds videos from")
26+
Rel(elearningSystem, calendarSystem, "Schedules events using")
27+
Rel(elearningSystem, emailSystem, "Sends notifications through")
28+
Rel(elearningSystem, authSystem, "Authenticates users via")
29+
30+
Rel(emailSystem, learner, "Sends notifications to")
31+
Rel(emailSystem, contentCreator, "Sends notifications to")
32+
Rel(emailSystem, reviewer, "Sends notifications to")
33+
34+
@enduml

0 commit comments

Comments
 (0)