Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions 2025/solid/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
output/
logs/
sales_report.json
97 changes: 97 additions & 0 deletions 2025/solid/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Sales Report β€” Multi-Paradigm Code Showcase

**Purpose:** Demonstrates how the same business logic (a simple sales performance report)
can be implemented using multiple programming paradigms and architectural styles in Python.

---

## Project Overview

All implementations compute the same core metrics from `sales_data.csv`:

- Number of customers
- Average order value (pre-tax)
- Percentage of returns
- Total sales in period (pre-tax)

Each version differs only in *structure* and *paradigm emphasis*.

| Paradigm | File | Highlights |
|-----------|------|------------|
| Procedural / Baseline | `messy_report.py` | Imperative style, easy to follow but not modular |
| Object-Oriented (SOLID) | Legacy: `class_based_report.py`<br>Enhanced: `class_based_report_v2.py` | Classes and interfaces; SRP and OCP applied with improved logging/robustness |
| Functional | Legacy: `functional_report.py`<br>Enhanced: `functional_report_v2.py` | Pure transformations with v2 adding structured logging and stronger error handling |
| Declarative Pipeline | `declarative_report.py` | Type-checked pipelines using Pandera |
| Config-Driven | `config_report.py` | YAML configuration defines logic dynamically |
| Asynchronous | `async_report.py` | Concurrent metric computation and async I/O |
| Async Streaming (No Pandas) | `async_no_pandas_report.py` | True non-blocking CSV streaming with aiofiles |
| Dataflow / DAG | `report_dataflow.py` | Declarative dependency graph with explicit dataflow |
| Actor Model | `report_actor_model.py` | Cooperative message-passing actors with isolated state |
| Reactive | `reactive_report.py` | RxPY stream-based reporting |
| Logic / Relational | `logic_report.py` | Relational facts and symbolic reasoning via Kanren |

> The legacy functional and class-based versions match the walkthrough in the video. The `_v2` editions layer in richer logging, error handling, and filesystem conventions while preserving the same outputs.

---

## Usage

### 1. Setup Environment

This project uses [uv](https://docs.astral.sh/uv/) for dependency management. Install it if you have not already, then run:

```bash
uv sync
```

### 2. Run Examples

#### Option A β€” Helper Script

From this directory you can execute one or more implementations with the helper script:

```bash
./run_reports.sh --list # view available keys
./run_reports.sh --run functional --run logic
./run_reports.sh --run-all
```

The script uses `uv run` automatically when `uv` is installed (recommended). Use `--dry-run` to preview the commands without executing them.

#### Option B β€” Manual Commands

Use `uv run` to execute any of the implementations inside the managed environment:

```bash
uv run python logic_report.py
uv run python functional_report.py
uv run python functional_report_v2.py
uv run python async_report.py
uv run python async_no_pandas_report.py
uv run python report_dataflow.py
uv run python report_actor_model.py
uv run python class_based_report.py
uv run python class_based_report_v2.py
uv run python reactive_report.py
# etc.
```

### 3. Validate Outputs

After running one or more implementations, verify the generated JSON payloads agree:

```bash
uv run python verify_reports.py # compare every file to the first baseline
uv run python verify_reports.py --verbose
uv run python verify_reports.py --baseline sales_report.json
```

The script checks for consistent keys and values (within a configurable tolerance) across every report implementation. A non-zero exit code indicates a mismatch.

## Educational Goal

This project illustrates how:

- The same logic can map into multiple thought models (OOP, FP, Async, Logic).
- Paradigm choice affects extensibility, readability, and reasoning complexity.
- Abstraction boundaries (metrics, config, I/O) remain constant across paradigms.
157 changes: 157 additions & 0 deletions 2025/solid/async_no_pandas_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
===============================================================================
Sales Report - True Async Streaming Implementation
===============================================================================
Fully asynchronous, non-blocking reporting pipeline:
* Reads CSV line by line using aiofiles (no pandas blocking)
* Parses each record asynchronously using csv.DictReader
* Aggregates metrics incrementally (no in-memory dataset)
* Writes JSON output asynchronously

This version is a true example of async I/O β€” no fake async via thread executors.
Perfect for large files or integration into larger async systems.
===============================================================================
"""

import aiofiles
import asyncio
import csv
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any

# ------------------------------------------------------------------------------
# Logging Configuration
# ------------------------------------------------------------------------------

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [async-true] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

# ------------------------------------------------------------------------------
# Constants
# ------------------------------------------------------------------------------

BASE_DIR = Path(__file__).parent
INPUT_FILE = BASE_DIR / "sales_data.csv"
OUTPUT_DIR = BASE_DIR / "output"
OUTPUT_DIR.mkdir(exist_ok=True)
OUTPUT_FILE = OUTPUT_DIR / "sales_report_async_true.json"
REPORT_START = datetime(2024, 1, 1)
REPORT_END = datetime(2024, 12, 31)


# ------------------------------------------------------------------------------
# Async Data Processing
# ------------------------------------------------------------------------------


async def parse_and_aggregate_csv(
file_path: Path,
start: datetime | None = None,
end: datetime | None = None,
) -> dict[str, Any]:
"""
Asynchronously reads and aggregates sales data.
Computes metrics incrementally to avoid loading entire file in memory.
"""

num_customers: set[str] = set()
total_sales = 0.0
positive_sales: list[float] = []
total_records = 0
total_returns = 0

logger.info("Reading asynchronously from %s", file_path)

async with aiofiles.open(file_path, mode="r", encoding="utf-8") as afp:
# Read header line for DictReader initialization
header_line = await afp.readline()
headers = header_line.strip().split(",")
reader = csv.DictReader([],
fieldnames=[h.strip('"') for h in headers])

# Process line by line
async for line in afp:
if not line.strip():
continue

values = list(csv.reader([line]))[0]
record = dict(zip(reader.fieldnames, values))

try:
sale_date = datetime.fromisoformat(record["date"].strip('"'))
price = float(record["price"])
name = record["name"].strip('"')
except Exception:
# Skip badly formed lines safely
continue

if start and sale_date < start or end and sale_date > end:
continue

total_records += 1
num_customers.add(name)
total_sales += price

if price > 0:
positive_sales.append(price)
elif price < 0:
total_returns += 1

avg_order_value = sum(positive_sales) / len(positive_sales) if positive_sales else 0.0
percentage_returns = (
(total_returns / total_records) * 100 if total_records else 0.0
)

logger.info(
"Processed %d records (%d customers, %.2f avg order)",
total_records,
len(num_customers),
avg_order_value,
)

return {
"number_of_customers": len(num_customers),
"average_order_value (pre-tax)": round(avg_order_value, 2),
"percentage_of_returns": round(percentage_returns, 2),
"total_sales_in_period (pre-tax)": round(total_sales, 2),
"report_start": start.strftime("%Y-%m-%d") if start else "N/A",
"report_end": end.strftime("%Y-%m-%d") if end else "N/A",
}


async def write_report(report: dict[str, Any], file_path: Path) -> None:
"""Writes the computed report asynchronously to JSON."""

async with aiofiles.open(file_path, mode="w", encoding="utf-8") as afp:
await afp.write(json.dumps(report, indent=2))
logger.info("Async streaming report written to %s", file_path)


# ------------------------------------------------------------------------------
# Entrypoint
# ------------------------------------------------------------------------------


async def main() -> None:
logger.info("Starting true asynchronous sales report pipeline...")

try:
report_data = await parse_and_aggregate_csv(
INPUT_FILE, REPORT_START, REPORT_END
)
await write_report(report_data, OUTPUT_FILE)
logger.info("Report generation completed successfully.")
except Exception as exc:
logger.exception("Async streaming report failed: %s", exc)
raise SystemExit(1) from exc


if __name__ == "__main__":
asyncio.run(main())
Loading