Skip to content

Add BookDetailsProvider and enable CI for testing #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
name: Rust Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

env:
CARGO_TERM_COLOR: always

jobs:
test:
name: Test
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Setup Docker Compose
run: |
docker compose --version

- name: Check Docker Compose file
run: |
echo "Using Docker Compose with profiles for CI"
grep -q "profiles:" docker-compose.yaml && echo "Profiles are configured in docker-compose.yaml" || echo "Warning: No profiles found in docker-compose.yaml"

- name: Start DB and Kafka
run: |
docker compose --profile ci up -d --wait --timeout 120 \
|| (echo "Docker compose up --wait failed. Dumping Kafka logs:" && docker compose logs kafka && exit 1)

docker compose ps

- name: Wait for services to be healthy
run: |
DB_HEALTH=$(docker inspect --format "{{.State.Health.Status}}" $(docker compose ps -q db))
KAFKA_HEALTH=$(docker inspect --format "{{.State.Health.Status}}" $(docker compose ps -q kafka))

if [ "$DB_HEALTH" != "healthy" ] || [ "$KAFKA_HEALTH" != "healthy" ]; then
echo "Services did not become healthy in time."
echo "DB Status: $DB_HEALTH"
echo "Kafka Status: $KAFKA_HEALTH"
docker compose logs db
docker compose logs kafka
exit 1
else
echo "DB and Kafka are healthy."
fi
- name: Setup environment variables
run: |
echo "DATABASE_URL=postgres://postgres:password@localhost:5432/bookapp" > .env
echo "KAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy

- name: Install SQLx CLI
run: cargo install sqlx-cli --no-default-features --features native-tls,postgres

- name: Run database migrations
run: |
cd bookapp
sqlx database create
sqlx migrate run
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/bookapp

- name: Build
run: cargo build --verbose

- name: Run tests
run: cargo test --verbose
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/bookapp
KAFKA_BOOTSTRAP_SERVERS: localhost:9092

- name: Stop Docker Compose
if: always() # Run even if the tests fail
run: docker compose --profile ci down
58 changes: 58 additions & 0 deletions bookapp/src/book_details.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;
use async_trait::async_trait;
use client::Client;
use tracing::instrument;
use crate::db::Book;

/// A trait for providing detailed book information from external sources
#[async_trait]
pub trait BookDetailsProvider: Send + Sync {
/// Enriches a collection of books with additional details from external sources
async fn enrich_book_details(&self, books: &[Book]);
}

/// Real implementation of BookDetailsProvider that fetches data from the backend
#[derive(Debug)]
pub struct RealBookDetailsProvider;

#[async_trait]
impl BookDetailsProvider for RealBookDetailsProvider {
#[instrument(skip(self, books), fields(num_books = books.len()))]
async fn enrich_book_details(&self, books: &[Book]) {
tracing::info!("Enriching book details for {} books", books.len());

for book in books {
// Call the progenitor client to get additional details
if let Ok(details) = self.get_book_details(book.id).await {
tracing::debug!(
book_id = book.id,
"Successfully enriched book details"
);
}
}
}
}

impl RealBookDetailsProvider {
#[instrument(fields(book_id, otel.kind = "Client"))]
async fn get_book_details(
&self,
book_id: i32,
) -> Result<client::ResponseValue<client::types::Book>, client::Error> {
// Fetch a single book detail using the progenitor generated client
let progenitor_client = Client::new("http://backend:8000", client::ClientState::default());
progenitor_client.get_book().id(book_id).send().await
}
}

/// Stub implementation of BookDetailsProvider for testing
pub struct StubBookDetailsProvider;

#[async_trait]
impl BookDetailsProvider for StubBookDetailsProvider {
#[instrument(skip(self, books), fields(num_books = books.len()))]
async fn enrich_book_details(&self, books: &[Book]) {
tracing::info!("Using stub book details provider for {} books", books.len());
// No-op implementation for testing
}
}
30 changes: 24 additions & 6 deletions bookapp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ mod reqwest_traced_client;
mod rest;
mod topic_management;
mod tracing_config;
mod rest_tests;
mod book_details;

use opentelemetry::global;
use std::sync::Arc;
use crate::book_details::{BookDetailsProvider, RealBookDetailsProvider};

use tracing_subscriber;

Expand All @@ -33,6 +37,8 @@ fn router(connection_pool: PgPool, producer: FutureProducer) -> Router {

Router::new()
.nest_service("/books", rest::book_service())
// inject the real provider here
.layer(Extension(Arc::new(RealBookDetailsProvider) as Arc<dyn BookDetailsProvider>))
.layer(Extension(producer))
// Our custom error injection layer can inject errors
// This layer itself can be traced - so needs to be added before our OtelAxumLayer
Expand Down Expand Up @@ -114,7 +120,7 @@ async fn main() -> Result<()> {
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await?;

info!("Starting webserver");
axum::serve(listener, app)
let server = axum::serve(listener, app)
.with_graceful_shutdown(async {
let mut signal_terminate = signal(SignalKind::terminate()).unwrap();
let mut signal_interrupt = signal(SignalKind::interrupt()).unwrap();
Expand All @@ -123,15 +129,27 @@ async fn main() -> Result<()> {
_ = signal_terminate.recv() => tracing::debug!("Received SIGTERM."),
_ = signal_interrupt.recv() => tracing::debug!("Received SIGINT."),
}
})
.await?;
});

tokio::select! {
_ = server => tracing::info!("Server has shut down gracefully."),
else => tracing::error!("Server encountered an error."),
}
}

info!("Shutting down OpenTelemetry");

trace_provider.shutdown()?;
meter_provider.shutdown()?;
log_provider.shutdown()?;
if let Err(e) = trace_provider.shutdown() {
tracing::error!("Error shutting down trace provider: {:?}", e);
}
if let Err(e) = meter_provider.shutdown() {
tracing::error!("Error shutting down meter provider: {:?}", e);
}
if let Err(e) = log_provider.shutdown() {
tracing::error!("Error shutting down log provider: {:?}", e);
}

info!("Shutdown complete");

Ok(())
}
58 changes: 14 additions & 44 deletions bookapp/src/rest.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;
use crate::db::{Book, BookCreateIn, BookStatus};
use crate::{db, reqwest_traced_client};
use crate::book_details::BookDetailsProvider;
use axum::extract::{Path, Query};
use axum::http::StatusCode;
use axum::routing::{delete, get, patch, post};
Expand All @@ -11,42 +13,18 @@ use sqlx::PgPool;
use tracing::{Instrument, Level};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use client::Client;

#[tracing::instrument(skip(con), fields(num_books))]
async fn get_all_books(Extension(con): Extension<PgPool>) -> Result<Json<Vec<Book>>, StatusCode> {
#[tracing::instrument(skip(con, details), fields(num_books))]
async fn get_all_books(
Extension(con): Extension<PgPool>,
Extension(details): Extension<Arc<dyn BookDetailsProvider>>,
) -> Result<Json<Vec<Book>>, StatusCode> {
tracing::info!("Getting all books");

match db::get_all_books(&con).await {
Ok(books) => {
// Now let's add an attribute to the tracing span with the number of books
tracing::Span::current().record("num_books", books.len() as i64);

// Fetch 5 book details from the backend service using reqwest-tracing client
let _book_details = crate::reqwest_traced_client::fetch_bulk_book_details(&books).await;

let _book_detail_res =
get_book_details_with_progenitor_client(books.first().unwrap().id).await;
let span = tracing::info_span!("tokio_spawned_requests");

let book_details_futures = books
.iter()
.take(5)
.map(|b: &Book| b.id)
.map(|id| {
tokio::spawn(
get_book_details_with_progenitor_client(id).instrument(span.clone()),
)
})
.collect::<Vec<_>>();

let all_book_details = futures::future::join_all(book_details_futures).await;

tracing::info!(
num_books = all_book_details.len(),
"Got all book details using progenitor"
);

// delegate to injected provider
details.enrich_book_details(&books).await;
Ok(Json(books))
}
Err(e) => {
Expand All @@ -56,15 +34,6 @@ async fn get_all_books(Extension(con): Extension<PgPool>) -> Result<Json<Vec<Boo
}
}

#[tracing::instrument(fields(book_id, otel.kind = "Client"))]
async fn get_book_details_with_progenitor_client(
book_id: i32,
) -> Result<client::ResponseValue<client::types::Book>, client::Error> {
// Fetch a single book detail using the progenitor generated client
let progenitor_client = Client::new("http://backend:8000", client::ClientState::default());

progenitor_client.get_book().id(book_id).send().await
}

#[tracing::instrument(skip(con), ret(level = Level::TRACE))]
async fn get_book(
Expand Down Expand Up @@ -136,11 +105,11 @@ async fn create_book(
Extension(con): Extension<PgPool>,
Extension(producer): Extension<FutureProducer>,
Json(book): Json<BookCreateIn>,
) -> Result<Json<i32>, StatusCode> {
) -> Result<(StatusCode, Json<i32>), StatusCode> {
let status = book.status.unwrap_or(BookStatus::Available);
if let Ok(new_id) = db::create_book(&con, book.author, book.title, status).await {
queue_background_ingestion_task(&producer, new_id).await;
Ok(Json(new_id))
Ok((StatusCode::CREATED, Json(new_id)))
} else {
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
Expand All @@ -150,12 +119,12 @@ async fn create_book(
async fn bulk_create_books(
Extension(con): Extension<PgPool>,
Json(payload): Json<Vec<BookCreateIn>>,
) -> Result<Json<Vec<i32>>, StatusCode> {
) -> Result<(StatusCode, Json<Vec<i32>>), StatusCode> {
let num = payload.len() as i64;
tracing::Span::current().record("num_books", num);

match db::bulk_insert_books(&con, &payload).await {
Ok(ids) => Ok(Json(ids)),
Ok(ids) => Ok((StatusCode::CREATED, Json(ids))),
Err(e) => {
tracing::error!(error=%e, "bulk insert failed");
Err(StatusCode::INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -197,3 +166,4 @@ pub fn book_service() -> Router {
.route("/bulk_add", post(bulk_create_books))
.route("/{id}", delete(delete_book))
}

Loading
Loading