Skip to content

Conversation

@spuckhafte
Copy link

@spuckhafte spuckhafte commented Dec 30, 2025

Fixes #1498.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Added a GET endpoint to list resources affected by a log stream (filters, dashboards with affected tiles, alerts, and roles) to aid dependency visibility before changes.
  • Permissions
    • Introduced a new permission action to allow scoped access to the affected-resources endpoint for authorized roles.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 30, 2025

Walkthrough

Adds a new GET endpoint to report resources (filters, dashboards, alerts, roles) affected by a specific logstream. Implements business logic to verify stream existence and to query metastore/object storage for affected resources, and introduces a new RBAC action for authorizing the endpoint.

Changes

Cohort / File(s) Summary
Core Business Logic
src/handlers/http/modal/utils/logstream_utils.rs
New LogstreamAffectedResources model and LogstreamAffectedDashboard. Adds async methods to load and fetch affected filters, dashboards (parsing dashboard JSON and identifying tiles referencing the stream via TILE_FIELD_REFERRING_TO_STREAM), alerts, and roles. Introduces LogstreamAffectedResourcesError, Bytes2JSONError, bytes_to_json helper, and conversions to StreamError.
HTTP Handler
src/handlers/http/logstream.rs
Adds public async handler get_affected_resources(stream_name) that ensures the stream exists (loading from storage if necessary) and returns LogstreamAffectedResources as JSON or an error.
Routes / Server
src/handlers/http/modal/ingest_server.rs, src/handlers/http/modal/server.rs
Registers GET /logstream/{logstream}/affected-resources routed to logstream::get_affected_resources and requires GetLogstreamAffectedResources authorization within the logstream API scope.
RBAC
src/rbac/role.rs
Adds GetLogstreamAffectedResources to Action enum, maps it as a resource-scoped permission in RoleBuilder::build, and includes it in default privilege sets for Admin, Editor, Writer, and Reader roles.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant API as HTTP Handler
    participant Utils as LogstreamAffectedResources
    participant Meta as Metastore
    participant ObjStore as Object Storage

    Client->>API: GET /logstream/{name}/affected-resources
    API->>Utils: load(stream_name)
    Utils->>Meta: ensure stream exists (PARSEABLE.streams)
    Meta-->>Utils: stream metadata
    par Fetch resources
        Utils->>Meta: fetch_affected_filters(stream_name)
        Meta-->>Utils: filter list
        Utils->>ObjStore: fetch dashboards blobs
        ObjStore-->>Utils: dashboard JSON bytes
        Utils->>Utils: parse dashboards, collect affected tile IDs
        Utils->>Meta: fetch_affected_alerts(stream_name)
        Meta-->>Utils: alert list
        Utils->>Meta: fetch_affected_roles(stream_name)
        Meta-->>Utils: role list
    end
    Utils-->>API: LogstreamAffectedResources JSON
    API-->>Client: HTTP 200 + payload
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • praveen5959
  • parmesant
  • nikhilsinhaparseable

Poem

🐰 I hopped through code with eager paws tonight,

Found streams and tiles that hid from light,
Filters, alerts, dashboards in a row,
Roles that guard the data flow —
A tidy map by moonbeam's glow.

🚥 Pre-merge checks | ✅ 3 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description provides the linked issue reference but lacks implementation details, testing status, comments, and documentation explanations required by the template checklist. Fill in the description with details about the API endpoint design, key implementation changes, and ensure all template checklist items are properly addressed or explicitly marked as N/A.
Docstring Coverage ⚠️ Warning Docstring coverage is 5.56% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding a new endpoint to fetch resources affected by a logstream deletion.
Linked Issues check ✅ Passed The PR successfully implements all required objectives from #1498: API endpoints for fetching filters, dashboards, alerts, and roles affected by a logstream.
Out of Scope Changes check ✅ Passed All changes are within scope: new endpoint implementation, authorization configuration, and affected resources utility are directly aligned with #1498 requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

CLA Assistant Lite bot:
Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
src/handlers/http/logstream.rs (1)

498-516: Fix typo in variable name.

The variable affecred_resources on line 511 has a typo — should be affected_resources.

🔎 Suggested fix
     match LogstreamAffectedResources::load(&stream_name).await {
-        Ok(affecred_resources) 
-            => Ok((web::Json(affecred_resources), StatusCode::OK)),
+        Ok(affected_resources) 
+            => Ok((web::Json(affected_resources), StatusCode::OK)),
         Err(err) 
             => Err(err.into())
     }
src/handlers/http/modal/utils/logstream_utils.rs (3)

122-131: Stale doc comment.

The doc comment mentions returning "A tuple where: First element: true if no resources are affected..." but the function returns Result<Self, LogstreamAffectedResourcesError>. Update or remove the outdated documentation.

🔎 Suggested fix
     /// Load all resources that will be affected if the given logstream is deleted.
     ///
     /// ### Arguments
     /// - `stream_name` - The name of the logstream to check for dependencies
     ///
     /// ### Returns
-    /// A tuple where:
-    /// - First element: `true` if no resources are affected (empty loaded struct), `false` otherwise
-    /// - Second element: The populated `LogstreamAffectedResources` struct
- 
+    /// A `LogstreamAffectedResources` struct populated with all dependent resources,
+    /// or an error if the stream doesn't exist or fetching fails.
     pub async fn load(stream_name: &str) -> Result<Self, LogstreamAffectedResourcesError> {

141-156: Redundant stream existence checks across fetch methods.

Each fetch_affected_* method independently checks PARSEABLE.streams.contains(stream_name). Since load() calls all four methods sequentially, consider checking stream existence once in load() before calling the fetch methods, or make the individual methods private and trust the caller.

This would reduce redundant lookups and centralize the validation logic.


341-357: Simplify bytes_to_json to avoid double deserialization.

The function first deserializes to serde_json::Value, then deserializes again to T. This is inefficient — use serde_json::from_slice::<T>() directly.

🔎 Suggested fix
 fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: Bytes) -> Result<T, Bytes2JSONError> {
     if json_bytes.is_empty() {
         return Err(Bytes2JSONError::ZeroSizedBytes);
     }
 
-    let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(&json_bytes) {
-        Ok(value) => value,
-        Err(err) => {
-            return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err)))
-        }
-    };
-
-    return match serde_json::from_value::<T>(json_bytes_value.clone()) {
-        Ok(parsed_object) => Ok(parsed_object),
-        Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e)))
-    };
+    serde_json::from_slice::<T>(&json_bytes)
+        .map_err(|e| Bytes2JSONError::FailedToParse(format!("{:#?}", e)))
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cab9fc0 and 7def6f7.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/ingest_server.rs
  • src/handlers/http/modal/query_server.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/rbac/role.rs
🧰 Additional context used
🧠 Learnings (11)
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-28T06:17:01.201Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (5)
src/handlers/http/modal/server.rs (1)
src/handlers/http/logstream.rs (1)
  • get_affected_resources (498-516)
src/handlers/http/logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • load (132-139)
src/handlers/http/modal/query_server.rs (2)
src/rbac/map.rs (1)
  • get (241-243)
src/handlers/http/logstream.rs (1)
  • get_affected_resources (498-516)
src/handlers/http/modal/ingest_server.rs (1)
src/handlers/http/logstream.rs (1)
  • get_affected_resources (498-516)
src/handlers/http/modal/utils/logstream_utils.rs (1)
src/metadata.rs (1)
  • new (99-136)
🔇 Additional comments (9)
src/handlers/http/modal/query_server.rs (1)

316-323: LGTM!

The new endpoint follows the established routing pattern for logstream-scoped resources and correctly uses resource-based authorization with Action::GetLogstreamAffectedResources.

src/handlers/http/modal/server.rs (1)

491-498: LGTM!

The endpoint is consistently wired with the same handler and authorization as in query_server.rs, maintaining uniformity across server modes.

src/handlers/http/modal/ingest_server.rs (1)

275-282: LGTM!

Consistent endpoint implementation across all server modes. The read-only nature of this endpoint makes it appropriate to expose on the ingest server.

src/rbac/role.rs (3)

35-35: LGTM!

The new GetLogstreamAffectedResources action is correctly added and follows the existing pattern for stream-related read operations.


168-168: LGTM!

Correctly classified as a resource-scoped permission, consistent with similar read actions like GetStats and GetSchema.


242-242: LGTM!

Appropriately granted to Editor, Writer, and Reader roles. This is a read-only operation that provides dependency visibility, which all these roles should have access to.

Also applies to: 279-279, 320-320

src/handlers/http/logstream.rs (1)

23-23: LGTM!

Import correctly added for the new LogstreamAffectedResources utility type.

src/handlers/http/modal/utils/logstream_utils.rs (2)

313-327: LGTM!

The From<LogstreamAffectedResourcesError> for StreamError implementation correctly maps error variants, preserving structured errors where possible and falling back to Anyhow for others.


91-104: LGTM!

Clean data structures with appropriate Serialize derives for JSON responses. The use of Ulid for IDs is consistent with the codebase patterns.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

189-193: Fragile stream name matching in dashboard tiles.

Using chart_query.contains(stream_name) for matching can produce false positives. For example, a stream named "log" would match a query referencing "catalog" or "logstream_backup".

Consider using a more precise matching approach, such as word-boundary matching or parsing the query to extract the actual stream reference.


198-201: Generating new ULID for missing dashboard_id masks data issues.

When dashboard.dashboard_id is None, generating a new Ulid::new() produces a random ID that won't match the actual dashboard. This could confuse API consumers and mask underlying data integrity problems.

Consider returning an error or skipping dashboards with missing IDs instead.

🔎 Suggested alternative
             if !affected_tile_ids.is_empty() {
-                affected_dashboards.push(LogstreamAffectedDashboard { 
-                    dashboard_id: dashboard.dashboard_id.unwrap_or_else(|| {
-                        tracing::warn!("dashboard {}: [id] is missing -- for logstream {}", dash_i, stream_name);
-                        Ulid::new() // default to a new ULID if missing -- what else?
-                    }), 
-                    affected_tile_ids
-                });
+                if let Some(dashboard_id) = dashboard.dashboard_id {
+                    affected_dashboards.push(LogstreamAffectedDashboard { 
+                        dashboard_id, 
+                        affected_tile_ids
+                    });
+                } else {
+                    tracing::warn!("dashboard {}: [id] is missing, skipping -- for logstream {}", dash_i, stream_name);
+                }
             }
🧹 Nitpick comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)

131-146: Consider logging when filters lack IDs.

Filters without filter_id are silently dropped (line 143-144), whereas fetch_affected_dashboards and fetch_affected_alerts log warnings when parsing fails. For consistency and debuggability, consider logging when filters are skipped.

🔎 Suggested enhancement
         Ok(PARSEABLE.metastore.get_filters().await?
             .into_iter()
             .filter_map(|filter| {
                 if filter.stream_name == stream_name && 
                     let Some(f_id) = filter.filter_id { 
                         Some(f_id) 
-                    } else { None }
+                    } else {
+                        if filter.stream_name == stream_name {
+                            tracing::warn!("filter for stream {} has no filter_id, skipping", stream_name);
+                        }
+                        None
+                    }
             }).collect())

242-300: Logic is correct, consider refactoring repetitive pattern matching.

The role fetching logic is sound and handles all privilege types correctly. The pattern matching at lines 262-285 is repetitive but explicit.

🔎 Optional refactoring to reduce repetition

The three privilege arms (Ingestor, Reader, Writer) have identical structure. Consider extracting a helper:

fn extract_stream_from_resource(resource: &ParseableResourceType) -> Option<&str> {
    match resource {
        ParseableResourceType::Stream(stream) => Some(stream.as_str()),
        _ => None
    }
}

Then simplify:

                 let associated_stream = match privilege {
-                    DefaultPrivilege::Ingestor { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
-                    DefaultPrivilege::Reader { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
-                    DefaultPrivilege::Writer { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
+                    DefaultPrivilege::Ingestor { resource } |
+                    DefaultPrivilege::Reader { resource } |
+                    DefaultPrivilege::Writer { resource } => {
+                        match extract_stream_from_resource(resource) {
+                            Some(s) => s,
+                            None => continue
+                        }
+                    },
                     _ => continue
                 };

331-347: Consider simplifying the JSON parsing logic.

The function works correctly but has some style issues:

  1. The return keyword at line 343 is redundant in Rust
  2. The two-step parsing (lines 336-341) is unnecessary overhead
🔎 Suggested simplification
 fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: Bytes) -> Result<T, Bytes2JSONError> {
     if json_bytes.is_empty() {
         return Err(Bytes2JSONError::ZeroSizedBytes);
     }
 
-    let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(&json_bytes) {
-        Ok(value) => value,
-        Err(err) => {
-            return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err)))
-        }
-    };
-
-    return match serde_json::from_value::<T>(json_bytes_value.clone()) {
-        Ok(parsed_object) => Ok(parsed_object),
-        Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e)))
-    };
+    serde_json::from_slice::<T>(&json_bytes)
+        .map_err(|e| Bytes2JSONError::FailedToParse(format!("{:#?}", e)))
 }

This is more idiomatic Rust and avoids the unnecessary intermediate Value allocation.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7def6f7 and cfb0ef1.

📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (14)
📚 Learning: 2025-09-18T10:08:05.101Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: resources/formats.json:1469-1484
Timestamp: 2025-09-18T10:08:05.101Z
Learning: The "rust_server_logs" format in resources/formats.json was intentionally renamed to "parseable_server_logs" with no backward compatibility concerns because it was unreleased and had no external users depending on it. This was confirmed by nikhilsinhaparseable on PR 1415.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:45.809Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:440-440
Timestamp: 2025-09-09T14:08:45.809Z
Learning: In the resources/ingest_demo_data.sh demo script, hardcoded stream names like "demodata" in alert queries should be ignored and not flagged for replacement with $P_STREAM variables.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:27:56.858Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:0-0
Timestamp: 2025-05-01T10:27:56.858Z
Learning: The `add_tile()` function in `src/handlers/http/users/dashboards.rs` should use `get_dashboard_by_user(dashboard_id, &user_id)` instead of `get_dashboard(dashboard_id)` to ensure proper authorization checks when modifying a dashboard.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:38.114Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:418-418
Timestamp: 2025-09-09T14:08:38.114Z
Learning: In the resources/ingest_demo_data.sh demo script, the hardcoded stream name "demodata" should be kept as-is rather than replaced with the $P_STREAM variable, as this is intentional for demo consistency.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-28T06:17:01.201Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:33:51.767Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:125-148
Timestamp: 2025-05-01T10:33:51.767Z
Learning: When adding a tile to a dashboard in `add_tile()` function, the tile ID must be provided by the client and should not be generated by the server. If the tile ID is missing (nil), the API should fail the operation with an appropriate error message.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-15T18:18:14.590Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1348
File: src/prism/home/mod.rs:366-368
Timestamp: 2025-06-15T18:18:14.590Z
Learning: In the Parseable dashboard system, dashboard_id is guaranteed to always be present (never None) because IDs are generated at dashboard creation time, and there are no pre-v1 dashboards that could lack IDs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/utils/logstream_utils.rs (3)
src/parseable/mod.rs (2)
  • storage (284-286)
  • new (180-194)
src/storage/mod.rs (3)
  • from (182-188)
  • new (207-209)
  • new (220-226)
src/metadata.rs (1)
  • new (99-136)
🔇 Additional comments (4)
src/handlers/http/modal/utils/logstream_utils.rs (4)

19-42: LGTM!

The imports and constant definition are appropriate for the new affected resources functionality.


91-119: LGTM!

The data structures are well-designed with appropriate visibility, serialization support, and comprehensive error handling.


210-239: LGTM!

The alert fetching logic correctly uses exact string matching (s == stream_name) to avoid false positives, and properly handles parsing errors with logging.


303-317: LGTM!

The error conversion properly handles specific error types and falls back to anyhow for others.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/handlers/http/modal/utils/logstream_utils.rs (4)

122-129: Consider parallelizing independent async operations.

The four fetch methods are independent and could run concurrently using tokio::try_join! to reduce latency. This also consolidates the repeated stream existence checks.

🔎 Suggested refactor
 pub async fn load(stream_name: &str) -> Result<Self, LogstreamAffectedResourcesError> {
+    if !PARSEABLE.streams.contains(stream_name) {
+        return Err(StreamNotFound(stream_name.to_string()).into());
+    }
+
+    let (filters, dashboards, alerts, roles) = tokio::try_join!(
+        Self::fetch_affected_filters_unchecked(stream_name),
+        Self::fetch_affected_dashboards_unchecked(stream_name),
+        Self::fetch_affected_alerts_unchecked(stream_name),
+        Self::fetch_affected_roles_unchecked(stream_name),
+    )?;
+
     Ok(Self {
-        filters: Self::fetch_affected_filters(stream_name).await?,
-        dashboards: Self::fetch_affected_dashboards(stream_name).await?,
-        alerts: Self::fetch_affected_alerts(stream_name).await?,
-        roles: Self::fetch_affected_roles(stream_name).await?,
+        filters,
+        dashboards,
+        alerts,
+        roles,
     })
 }

Then rename the internal methods to _unchecked variants without stream existence checks, keeping the public ones with checks for standalone use.


138-145: Consider more idiomatic filter_map pattern.

The if-let chain syntax works but a standard filter().filter_map() chain may be more readable.

🔎 Suggested refactor
     Ok(PARSEABLE.metastore.get_filters().await?
         .into_iter()
-        .filter_map(|filter| {
-            if filter.stream_name == stream_name && 
-                let Some(f_id) = filter.filter_id { 
-                    Some(f_id) 
-                } else { None }
-        }).collect())
+        .filter(|filter| filter.stream_name == stream_name)
+        .filter_map(|filter| filter.filter_id)
+        .collect())
 }

269-292: Repetitive match arms could be consolidated.

The three privilege variants have identical logic for extracting the stream. Consider consolidating to reduce duplication.

🔎 Suggested refactor
-                let associated_stream = match privilege {
-                    DefaultPrivilege::Ingestor { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
-                    DefaultPrivilege::Reader { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
-                    DefaultPrivilege::Writer { resource } => {
-                        match resource {
-                            ParseableResourceType::Stream(stream) => stream,
-                            _ => continue
-                        }
-                    },
-
-                    _ => continue
-                };
+                let associated_stream = match privilege {
+                    DefaultPrivilege::Ingestor { resource }
+                    | DefaultPrivilege::Reader { resource }
+                    | DefaultPrivilege::Writer { resource } => match resource {
+                        ParseableResourceType::Stream(stream) => stream,
+                        _ => continue,
+                    },
+                    _ => continue,
+                };

338-354: Simplify by parsing directly to target type.

The two-step parsing (bytes → Value → T) adds overhead. Use serde_json::from_slice::<T>() directly. Also, the .clone() on line 350 is unnecessary and the explicit return is non-idiomatic.

🔎 Suggested refactor
 fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: Bytes) -> Result<T, Bytes2JSONError> {
     if json_bytes.is_empty() {
         return Err(Bytes2JSONError::ZeroSizedBytes);
     }

-    let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(&json_bytes) {
-        Ok(value) => value,
-        Err(err) => {
-            return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err)))
-        }
-    };
-
-    return match serde_json::from_value::<T>(json_bytes_value.clone()) {
-        Ok(parsed_object) => Ok(parsed_object),
-        Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e)))
-    };
+    serde_json::from_slice::<T>(&json_bytes)
+        .map_err(|e| Bytes2JSONError::FailedToParse(e.to_string()))
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cfb0ef1 and e6c993d.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (15)
📚 Learning: 2025-09-18T10:08:05.101Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: resources/formats.json:1469-1484
Timestamp: 2025-09-18T10:08:05.101Z
Learning: The "rust_server_logs" format in resources/formats.json was intentionally renamed to "parseable_server_logs" with no backward compatibility concerns because it was unreleased and had no external users depending on it. This was confirmed by nikhilsinhaparseable on PR 1415.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:45.809Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:440-440
Timestamp: 2025-09-09T14:08:45.809Z
Learning: In the resources/ingest_demo_data.sh demo script, hardcoded stream names like "demodata" in alert queries should be ignored and not flagged for replacement with $P_STREAM variables.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:27:56.858Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:0-0
Timestamp: 2025-05-01T10:27:56.858Z
Learning: The `add_tile()` function in `src/handlers/http/users/dashboards.rs` should use `get_dashboard_by_user(dashboard_id, &user_id)` instead of `get_dashboard(dashboard_id)` to ensure proper authorization checks when modifying a dashboard.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:38.114Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:418-418
Timestamp: 2025-09-09T14:08:38.114Z
Learning: In the resources/ingest_demo_data.sh demo script, the hardcoded stream name "demodata" should be kept as-is rather than replaced with the $P_STREAM variable, as this is intentional for demo consistency.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-28T06:17:01.201Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:33:51.767Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:125-148
Timestamp: 2025-05-01T10:33:51.767Z
Learning: When adding a tile to a dashboard in `add_tile()` function, the tile ID must be provided by the client and should not be generated by the server. If the tile ID is missing (nil), the API should fail the operation with an appropriate error message.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-15T18:18:14.590Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1348
File: src/prism/home/mod.rs:366-368
Timestamp: 2025-06-15T18:18:14.590Z
Learning: In the Parseable dashboard system, dashboard_id is guaranteed to always be present (never None) because IDs are generated at dashboard creation time, and there are no pre-v1 dashboards that could lack IDs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🔇 Additional comments (6)
src/handlers/http/modal/utils/logstream_utils.rs (6)

41-42: LGTM!

Good practice documenting the constant's purpose. The field name dbName aligns with the commit message fix for locating stream names in tiles.


91-104: LGTM!

Data structures are well-designed with appropriate derives for serialization and debugging.


106-119: LGTM!

Error type is well-structured with appropriate #[from] conversions for seamless error propagation.


167-169: Deduplication may not filter dashboards with None IDs.

When dashboard_id is None, multiple dashboards with missing IDs won't be deduplicated since None == None is true. However, this is mitigated by lines 204-211 which skip dashboards with missing IDs anyway.


204-211: Good fix for the missing dashboard_id handling.

The logic now properly skips dashboards with missing IDs instead of generating a random ULID, addressing the previous review concern.


310-324: LGTM!

Error conversion appropriately maps specific errors and wraps others in Anyhow for the catch-all case.

coderabbitai[bot]
coderabbitai bot previously approved these changes Dec 30, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)

118-126: Consider parallelizing the fetch operations.

The four fetch methods are independent and could execute concurrently using tokio::try_join! to reduce total latency.

♻️ Suggested refactor
     pub async fn load(stream_name: &str) -> Result<Self, LogstreamAffectedResourcesError> {
+        let (filters, dashboards, alerts, roles) = tokio::try_join!(
+            Self::fetch_affected_filters(stream_name),
+            Self::fetch_affected_dashboards(stream_name),
+            Self::fetch_affected_alerts(stream_name),
+            Self::fetch_affected_roles(stream_name),
+        )?;
+
         Ok(Self {
-            filters: Self::fetch_affected_filters(stream_name).await?,
-            dashboards: Self::fetch_affected_dashboards(stream_name).await?,
-            alerts: Self::fetch_affected_alerts(stream_name).await?,
-            roles: Self::fetch_affected_roles(stream_name).await?,
+            filters,
+            dashboards,
+            alerts,
+            roles,
         })
     }

160-187: Use HashSet instead of Vec for deduplication.

Using Vec::contains() for deduplication (lines 184, 212) results in O(n) lookup per check. For dashboards with many tiles, this becomes inefficient. A HashSet provides O(1) amortized lookups.

♻️ Suggested refactor
+use std::collections::HashSet;
 // ...
 
-        let mut parsed_dashboard_ids = Vec::<Ulid>::new();
+        let mut parsed_dashboard_ids = HashSet::<Ulid>::new();
         let mut affected_dashboards: Vec<LogstreamAffectedDashboard> = vec![];
 
         for (dash_i, dashboard_bytes) in all_dashboards.iter().enumerate() {
             // ... parsing code ...
 
             if parsed_dashboard_ids.contains(&dashboard_id) {
                 continue;
             };
-            parsed_dashboard_ids.push(dashboard_id);
+            parsed_dashboard_ids.insert(dashboard_id);
 
             // ...
 
-            let mut affected_tile_ids = Vec::<Ulid>::new();
+            let mut seen_tile_ids = HashSet::<Ulid>::new();
+            let mut affected_tile_ids = Vec::<Ulid>::new();
             for tile in tiles {
                 // ...
-                    if dbs_have_stream && !affected_tile_ids.contains(&tile.tile_id) {
+                    if dbs_have_stream && seen_tile_ids.insert(tile.tile_id) {
                         affected_tile_ids.push(tile.tile_id);
                     }

330-347: Simplify to single-step deserialization.

The two-step parsing (bytes → Value → T) is inefficient. Direct deserialization with from_slice::<T>() is both faster and simpler.

♻️ Suggested refactor
 fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: &Bytes) -> Result<T, Bytes2JSONError> {
     if json_bytes.is_empty() {
         return Err(Bytes2JSONError::ZeroSizedBytes);
     }
 
-    let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(json_bytes) {
-        Ok(value) => value,
-        Err(err) => return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err))),
-    };
-
-    match serde_json::from_value::<T>(json_bytes_value) {
-        Ok(parsed_object) => Ok(parsed_object),
-        Err(e) => Err(Bytes2JSONError::FailedToParse(format!(
-            "deserialization failed: {:#?}",
-            e
-        ))),
-    }
+    serde_json::from_slice::<T>(json_bytes)
+        .map_err(|e| Bytes2JSONError::FailedToParse(format!("{:#?}", e)))
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e6c993d and ea85f4d.

📒 Files selected for processing (3)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/ingest_server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/modal/ingest_server.rs
🧰 Additional context used
🧠 Learnings (18)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/handlers/http/logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-18T10:08:05.101Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: resources/formats.json:1469-1484
Timestamp: 2025-09-18T10:08:05.101Z
Learning: The "rust_server_logs" format in resources/formats.json was intentionally renamed to "parseable_server_logs" with no backward compatibility concerns because it was unreleased and had no external users depending on it. This was confirmed by nikhilsinhaparseable on PR 1415.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:45.809Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:440-440
Timestamp: 2025-09-09T14:08:45.809Z
Learning: In the resources/ingest_demo_data.sh demo script, hardcoded stream names like "demodata" in alert queries should be ignored and not flagged for replacement with $P_STREAM variables.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:27:56.858Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:0-0
Timestamp: 2025-05-01T10:27:56.858Z
Learning: The `add_tile()` function in `src/handlers/http/users/dashboards.rs` should use `get_dashboard_by_user(dashboard_id, &user_id)` instead of `get_dashboard(dashboard_id)` to ensure proper authorization checks when modifying a dashboard.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:38.114Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:418-418
Timestamp: 2025-09-09T14:08:38.114Z
Learning: In the resources/ingest_demo_data.sh demo script, the hardcoded stream name "demodata" should be kept as-is rather than replaced with the $P_STREAM variable, as this is intentional for demo consistency.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:33:51.767Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:125-148
Timestamp: 2025-05-01T10:33:51.767Z
Learning: When adding a tile to a dashboard in `add_tile()` function, the tile ID must be provided by the client and should not be generated by the server. If the tile ID is missing (nil), the API should fail the operation with an appropriate error message.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-15T18:18:14.590Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1348
File: src/prism/home/mod.rs:366-368
Timestamp: 2025-06-15T18:18:14.590Z
Learning: In the Parseable dashboard system, dashboard_id is guaranteed to always be present (never None) because IDs are generated at dashboard creation time, and there are no pre-v1 dashboards that could lack IDs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-28T06:17:01.201Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
  • load (119-126)
🔇 Additional comments (7)
src/handlers/http/modal/utils/logstream_utils.rs (5)

88-101: LGTM!

The struct definitions are well-designed for the API response, with appropriate types for identifiers.


103-116: LGTM!

The error enum comprehensively covers the failure modes for this feature, with clear error messages and automatic conversions.


128-150: LGTM!

The filter extraction logic is clean and idiomatic, correctly matching stream names and extracting filter IDs.


229-257: LGTM!

The alert extraction logic correctly filters alerts by checking if the stream is in the alert's datasets.


259-306: LGTM!

The role extraction correctly identifies roles with Stream-scoped privileges (Ingestor/Reader/Writer). The early break on line 300 is a good optimization.

src/handlers/http/logstream.rs (2)

23-23: LGTM!

Import correctly brings in the new utility type.


498-514: LGTM!

The endpoint correctly follows the established pattern: checking stream existence with check_or_load_stream (ensuring proper behavior in both query and standalone modes per the retrieved learning), then delegating to the business logic. Error handling is clean.

coderabbitai[bot]
coderabbitai bot previously approved these changes Jan 10, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In @src/handlers/http/modal/utils/logstream_utils.rs:
- Around line 268-275: The call to PARSEABLE.metastore.get_parseable_metadata()
inside fetch_affected_roles is wrapping the original Metastore error into
ObjectStorageError::MetastoreError (via .map_err(...)) which causes loss of the
original error variant downstream; remove that .map_err(...) and let the
original MetastoreError propagate (use ? directly) so
StreamError::MetastoreError is preserved. Also update the conversion/From
impl(s) that collapse ObjectStorageError/Bytes2JSONError into Anyhow —
explicitly match all ObjectStorageError variants and map MetastoreError to
StreamError::MetastoreError while mapping genuine storage variants to
StreamError::Storage(ObjectStorageError::...), so error structure is preserved
(adjust the conversion at the impl that currently covers lines ~311-319).
- Around line 130-136: Replace the direct
PARSEABLE.streams.contains(stream_name) check in fetch_affected_filters,
fetch_affected_dashboards, fetch_affected_alerts, and fetch_affected_roles with
a call to PARSEABLE.check_or_load_stream(stream_name) so streams present in
storage but not yet loaded are handled correctly; call check_or_load_stream and
propagate or map its error to the same
LogstreamAffectedResourcesError/StreamNotFound behavior the current code
expects. Update each fetch_affected_* function to use
check_or_load_stream(stream_name) before proceeding, and keep these methods
private only if they are truly internal and you prefer that alternative (ensure
load() still calls them).
🧹 Nitpick comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)

120-128: Consider parallelizing load() to reduce endpoint latency.
These are independent metastore calls; sequential awaits add avoidable tail latency.

Possible change
 pub async fn load(stream_name: &str) -> Result<Self, LogstreamAffectedResourcesError> {
-    Ok(Self {
-        filters: Self::fetch_affected_filters(stream_name).await?,
-        dashboards: Self::fetch_affected_dashboards(stream_name).await?,
-        alerts: Self::fetch_affected_alerts(stream_name).await?,
-        roles: Self::fetch_affected_roles(stream_name).await?,
-    })
+    let (filters, dashboards, alerts, roles) = tokio::try_join!(
+        Self::fetch_affected_filters(stream_name),
+        Self::fetch_affected_dashboards(stream_name),
+        Self::fetch_affected_alerts(stream_name),
+        Self::fetch_affected_roles(stream_name),
+    )?;
+    Ok(Self { filters, dashboards, alerts, roles })
 }

99-103: API response determinism: HashSet<Ulid> serializes in non-stable order.
If clients/tests diff JSON, this can create noise.

Options
-pub struct LogstreamAffectedDashboard {
+pub struct LogstreamAffectedDashboard {
     pub dashboard_id: Ulid,
-    pub affected_tile_ids: HashSet<Ulid>,
+    // Prefer deterministic ordering for API responses:
+    // pub affected_tile_ids: BTreeSet<Ulid>,
+    pub affected_tile_ids: HashSet<Ulid>,
 }

Or: keep HashSet internally, but convert to a sorted Vec at the boundary.


332-349: bytes_to_json double-parses JSON; parse directly into T unless you need the intermediate Value.
This adds allocations and CPU on potentially large dashboard payloads.

Suggested change
 fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: &Bytes) -> Result<T, Bytes2JSONError> {
     if json_bytes.is_empty() {
         return Err(Bytes2JSONError::ZeroSizedBytes);
     }
-
-    let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(json_bytes) {
-        Ok(value) => value,
-        Err(err) => return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err))),
-    };
-
-    match serde_json::from_value::<T>(json_bytes_value) {
-        Ok(parsed_object) => Ok(parsed_object),
-        Err(e) => Err(Bytes2JSONError::FailedToParse(format!(
-            "deserialization failed: {:#?}",
-            e
-        ))),
-    }
+    serde_json::from_slice::<T>(json_bytes).map_err(|e| {
+        Bytes2JSONError::FailedToParse(format!("{:#?}", e))
+    })
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ea85f4d and 7262021.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (12)
📚 Learning: 2025-09-09T14:08:45.809Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:440-440
Timestamp: 2025-09-09T14:08:45.809Z
Learning: In the resources/ingest_demo_data.sh demo script, hardcoded stream names like "demodata" in alert queries should be ignored and not flagged for replacement with $P_STREAM variables.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:27:56.858Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:0-0
Timestamp: 2025-05-01T10:27:56.858Z
Learning: The `add_tile()` function in `src/handlers/http/users/dashboards.rs` should use `get_dashboard_by_user(dashboard_id, &user_id)` instead of `get_dashboard(dashboard_id)` to ensure proper authorization checks when modifying a dashboard.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-09T14:08:38.114Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1427
File: resources/ingest_demo_data.sh:418-418
Timestamp: 2025-09-09T14:08:38.114Z
Learning: In the resources/ingest_demo_data.sh demo script, the hardcoded stream name "demodata" should be kept as-is rather than replaced with the $P_STREAM variable, as this is intentional for demo consistency.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-05-01T10:33:51.767Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1305
File: src/handlers/http/users/dashboards.rs:125-148
Timestamp: 2025-05-01T10:33:51.767Z
Learning: When adding a tile to a dashboard in `add_tile()` function, the tile ID must be provided by the client and should not be generated by the server. If the tile ID is missing (nil), the API should fail the operation with an appropriate error message.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-06-15T18:18:14.590Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1348
File: src/prism/home/mod.rs:366-368
Timestamp: 2025-06-15T18:18:14.590Z
Learning: In the Parseable dashboard system, dashboard_id is guaranteed to always be present (never None) because IDs are generated at dashboard creation time, and there are no pre-v1 dashboards that could lack IDs.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-03-28T06:17:01.201Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1276
File: src/prism/logstream/mod.rs:0-0
Timestamp: 2025-03-28T06:17:01.201Z
Learning: In the Parseable datasets API, specific stream names don't need to be logged in error cases because the API is called from the Parseable UI where only authorized users can access and the streams in the request are pre-filtered based on user authorization.

Applied to files:

  • src/handlers/http/modal/utils/logstream_utils.rs

Comment on lines +130 to +136
pub async fn fetch_affected_filters(
stream_name: &str,
) -> Result<Vec<String>, LogstreamAffectedResourcesError> {
if !PARSEABLE.streams.contains(stream_name) {
return Err(StreamNotFound(stream_name.to_string()).into());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check the actual code at specified lines in the target file
cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '125,270p'

Repository: parseablehq/parseable

Length of output: 5888


🏁 Script executed:

# Search for Mode enum definition and Mode::Query usage
rg "enum\s+Mode\b" -A 10 -S
rg "Mode::Query" -B 2 -A 2 -S | head -50

Repository: parseablehq/parseable

Length of output: 3046


🏁 Script executed:

# Find the canonical stream-existence pattern usage in other handlers
rg "create_stream_and_schema_from_storage" -B 3 -A 3 -S

Repository: parseablehq/parseable

Length of output: 4533


🏁 Script executed:

# Check if there's a logstream module or if this is part of a larger handler
fd "logstream_utils.rs" -x head -50 {}

Repository: parseablehq/parseable

Length of output: 1870


🏁 Script executed:

# Verify the specific pattern used in querier_logstream.rs to compare
rg -A 5 "!PARSEABLE.streams.contains" src/handlers/http/modal/query/querier_logstream.rs

Repository: parseablehq/parseable

Length of output: 449


🏁 Script executed:

# Check if there's a Modal API or Prism context where this code runs (Query mode specific?)
rg -B 10 "fetch_affected_filters|fetch_affected_dashboards" src/handlers/http/modal/utils/logstream_utils.rs | head -30

Repository: parseablehq/parseable

Length of output: 1051


🏁 Script executed:

# Check where LogstreamAffectedResources::load is called from (API endpoint)
rg "LogstreamAffectedResources::load" -B 5 -A 5

Repository: parseablehq/parseable

Length of output: 780


🏁 Script executed:

# Check if logstream_utils.rs is Query-mode specific or used across modes
rg "logstream_utils" -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1726


🏁 Script executed:

# Check what check_or_load_stream does - is it the right pattern?
rg "fn check_or_load_stream" -A 15

Repository: parseablehq/parseable

Length of output: 1140


🏁 Script executed:

# Find the handler function that calls LogstreamAffectedResources::load
rg -B 15 "LogstreamAffectedResources::load" src/handlers/http/logstream.rs

Repository: parseablehq/parseable

Length of output: 579


🏁 Script executed:

# Confirm that these fetch_* methods are public and could be called directly
rg "pub async fn fetch_affected" src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 228


🏁 Script executed:

# Check if fetch_affected_* methods are called anywhere else besides load()
rg "fetch_affected_filters|fetch_affected_dashboards|fetch_affected_alerts|fetch_affected_roles" -v "pub async fn"

Repository: parseablehq/parseable

Length of output: 117


🏁 Script executed:

# Check if fetch_affected_* methods are called anywhere besides the load() method
rg "fetch_affected_(filters|dashboards|alerts|roles)" --no-filename -B 1 -A 1 | grep -v "pub async fn"

Repository: parseablehq/parseable

Length of output: 501


🏁 Script executed:

# Check if check_or_load_stream is the canonical helper for this pattern
rg "check_or_load_stream" -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 5973


Use PARSEABLE.check_or_load_stream() in each fetch_affected_* method for consistent stream existence checks.

The fetch_affected_filters, fetch_affected_dashboards, fetch_affected_alerts, and fetch_affected_roles methods currently check stream existence using only PARSEABLE.streams.contains(stream_name). In Query/Prism modes, this fails for streams in storage but not yet loaded into memory. The codebase already provides check_or_load_stream() (in src/parseable/mod.rs) which handles this fallback; use it consistently here instead of a custom check. Alternatively, make these methods private since they're only called internally by load().

 pub async fn fetch_affected_filters(
     stream_name: &str,
 ) -> Result<Vec<String>, LogstreamAffectedResourcesError> {
-    if !PARSEABLE.streams.contains(stream_name) {
+    if !PARSEABLE.check_or_load_stream(stream_name).await {
         return Err(StreamNotFound(stream_name.to_string()).into());
     }
🤖 Prompt for AI Agents
In @src/handlers/http/modal/utils/logstream_utils.rs around lines 130 - 136,
Replace the direct PARSEABLE.streams.contains(stream_name) check in
fetch_affected_filters, fetch_affected_dashboards, fetch_affected_alerts, and
fetch_affected_roles with a call to PARSEABLE.check_or_load_stream(stream_name)
so streams present in storage but not yet loaded are handled correctly; call
check_or_load_stream and propagate or map its error to the same
LogstreamAffectedResourcesError/StreamNotFound behavior the current code
expects. Update each fetch_affected_* function to use
check_or_load_stream(stream_name) before proceeding, and keep these methods
private only if they are truly internal and you prefer that alternative (ensure
load() still calls them).

Comment on lines +161 to +229
let all_dashboards = PARSEABLE.metastore.get_dashboards().await?;
let mut parsed_dashboard_ids = Vec::<Ulid>::new();
let mut affected_dashboards: Vec<LogstreamAffectedDashboard> = vec![];

for (dash_i, dashboard_bytes) in all_dashboards.iter().enumerate() {
let dashboard = match self::bytes_to_json::<Dashboard>(dashboard_bytes) {
Ok(d) => d,
Err(e) => {
tracing::warn!("{}", e.to_string());
continue;
}
};

let dashboard_id = match dashboard.dashboard_id {
Some(id) => id,
None => {
tracing::warn!(
"dashboard {}: missing [id] field, skipping -- for logstream {}",
dash_i,
stream_name
);
continue;
}
};

if parsed_dashboard_ids.contains(&dashboard_id) {
continue;
};
parsed_dashboard_ids.push(dashboard_id);

let Some(tiles) = dashboard.tiles.as_ref() else {
continue;
};

let mut affected_tile_ids = HashSet::<Ulid>::new();
for tile in tiles {
let Some(tile_fields) = tile.other_fields.as_ref() else {
continue;
};

let Some(tile_value) = tile_fields.get(TILE_FIELD_REFERRING_TO_STREAM) else {
continue;
};

if let Some(db_names) = tile_value.as_array() {
let dbs_have_stream = db_names.iter().any(|db| {
if let Some(db_str) = db.as_str() {
db_str == stream_name
} else {
false
}
});

if dbs_have_stream {
affected_tile_ids.insert(tile.tile_id);
}
}
}

if !affected_tile_ids.is_empty() {
affected_dashboards.push(LogstreamAffectedDashboard {
dashboard_id,
affected_tile_ids,
});
}
}

Ok(affected_dashboards)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

git ls-files | head -20

Repository: parseablehq/parseable

Length of output: 550


🏁 Script executed:

fd -type f -name "logstream_utils.rs"

Repository: parseablehq/parseable

Length of output: 235


🏁 Script executed:

fd -type f -name "dashboards.rs" | head -5

Repository: parseablehq/parseable

Length of output: 235


🏁 Script executed:

fd -type f -path "*handlers/http*" -name "*.rs" | head -10

Repository: parseablehq/parseable

Length of output: 235


🏁 Script executed:

fd logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 113


🏁 Script executed:

fd dashboards.rs

Repository: parseablehq/parseable

Length of output: 126


🏁 Script executed:

find . -path "*/handlers/http*" -name "*.rs" | head -20

Repository: parseablehq/parseable

Length of output: 949


🏁 Script executed:

wc -l src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 117


🏁 Script executed:

sed -n '150,240p' src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 3129


🏁 Script executed:

rg "struct Dashboard|struct Tile|struct.*Tile" -A 10

Repository: parseablehq/parseable

Length of output: 1868


🏁 Script executed:

rg "TILE_FIELD_REFERRING_TO_STREAM" -n

Repository: parseablehq/parseable

Length of output: 318


🏁 Script executed:

rg "dbName" -n

Repository: parseablehq/parseable

Length of output: 447


🏁 Script executed:

rg "dbName" -B 2 -A 2 -C 3

Repository: parseablehq/parseable

Length of output: 2201


🏁 Script executed:

fd "dashboard" | xargs grep -l "dbName" 2>/dev/null | head -10

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg "other_fields.*get\(|tile_value.*as_" -B 2 -A 5

Repository: parseablehq/parseable

Length of output: 786


🏁 Script executed:

fd "*.json" | xargs grep -h "dbName" 2>/dev/null | head -20

Repository: parseablehq/parseable

Length of output: 350


🏁 Script executed:

rg "dbName.*\[|dbName.*\]" -B 1 -A 1

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '160,230p'

Repository: parseablehq/parseable

Length of output: 2936


🏁 Script executed:

rg "fetch_affected_dashboards" -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1021


🏁 Script executed:

rg "as_array\(\)" src/handlers/http/modal/utils/logstream_utils.rs -B 2 -A 3

Repository: parseablehq/parseable

Length of output: 327


🏁 Script executed:

rg "Vec.*contains|HashSet" src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 201


🏁 Script executed:

rg "dbName" resources/ -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1226


🏁 Script executed:

rg "\"dbName\"" --json | head -50

Repository: parseablehq/parseable

Length of output: 2304


🏁 Script executed:

python3 << 'EOF'
import json
# Parse the demo data script to understand dbName format
with open('resources/ingest_demo_data.sh', 'r') as f:
    content = f.read()
    # Find sections with dbName
    import re
    matches = re.findall(r'"dbName":\s*"[^"]*"', content)
    print(f"Found {len(matches)} dbName occurrences in demo data")
    for i, match in enumerate(set(matches)):
        print(f"  Example {i+1}: {match}")
EOF

Repository: parseablehq/parseable

Length of output: 151


🏁 Script executed:

rg "contains.*dashboard|dedup" src/handlers/http/modal/utils/logstream_utils.rs

Repository: parseablehq/parseable

Length of output: 126


🏁 Script executed:

rg "tile.*other_fields|other_fields.*tile" -B 5 -A 10 | head -80

Repository: parseablehq/parseable

Length of output: 1447


🏁 Script executed:

find . -name "*.md" -o -name "*.json" | xargs grep -l "dbName" 2>/dev/null | head -5

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg "as_str|as_array" src/handlers/http/modal/utils/logstream_utils.rs -B 1 -A 1

Repository: parseablehq/parseable

Length of output: 308


Handle both string and array formats for dbName, and use HashSet for dashboard deduplication.
The code misses tiles where dbName is stored as a string (per demo data), checking only .as_array(). Additionally, parsed_dashboard_ids.contains(...) on a Vec is quadratic.

Targeted fixes
-        let mut parsed_dashboard_ids = Vec::<Ulid>::new();
+        let mut parsed_dashboard_ids = HashSet::<Ulid>::new();

 ...
             if parsed_dashboard_ids.contains(&dashboard_id) {
                 continue;
             };
-            parsed_dashboard_ids.push(dashboard_id);
+            parsed_dashboard_ids.insert(dashboard_id);

 ...
-                if let Some(db_names) = tile_value.as_array() {
-                    let dbs_have_stream = db_names.iter().any(|db| {
-                        if let Some(db_str) = db.as_str() {
-                            db_str == stream_name
-                        } else {
-                            false
-                        }
-                    });
-
-                    if dbs_have_stream {
-                        affected_tile_ids.insert(tile.tile_id);
-                    }
-                }
+                let matches_stream = match tile_value {
+                    v if v.as_str().is_some() => v.as_str() == Some(stream_name),
+                    v if v.as_array().is_some() => v
+                        .as_array()
+                        .into_iter()
+                        .flatten()
+                        .any(|db| db.as_str() == Some(stream_name)),
+                    _ => false,
+                };
+                if matches_stream {
+                    affected_tile_ids.insert(tile.tile_id);
+                }

Comment on lines +268 to +275
let metadata_bytes = PARSEABLE
.metastore
.get_parseable_metadata()
.await
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?
.ok_or_else(|| {
ObjectStorageError::Custom("parseable metadata not initialized".into())
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and inspect StreamError enum definition
rg -n "pub\s+(enum|struct)\s+StreamError\b" -A 20 -S

Repository: parseablehq/parseable

Length of output: 1714


🏁 Script executed:

#!/bin/bash
# Find LogstreamAffectedResourcesError enum
rg -n "pub\s+(enum|struct)\s+LogstreamAffectedResourcesError\b" -A 20 -S

Repository: parseablehq/parseable

Length of output: 2104


🏁 Script executed:

#!/bin/bash
# Find the conversion implementation from LogstreamAffectedResourcesError to StreamError
rg -n "impl From<LogstreamAffectedResourcesError> for StreamError" -A 15 -S

Repository: parseablehq/parseable

Length of output: 1474


🏁 Script executed:

#!/bin/bash
# Check the actual fetch_affected_roles function
rg -n "pub async fn fetch_affected_roles" -A 15 -S

Repository: parseablehq/parseable

Length of output: 1520


🏁 Script executed:

#!/bin/bash
# Verify MetastoreError::to_detail() method exists
rg -n "fn to_detail|pub fn to_detail" -B 2 -A 5 --type rust | head -50

Repository: parseablehq/parseable

Length of output: 466


🏁 Script executed:

#!/bin/bash
# Check the file mentioned in the review
cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '260,280p'
cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '100,125p'
cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '305,325p'

Repository: parseablehq/parseable

Length of output: 2958


🏁 Script executed:

#!/bin/bash
# Get full StreamError enum definition
rg -n "pub enum StreamError" -A 50 src/handlers/http/logstream.rs

Repository: parseablehq/parseable

Length of output: 2411


🏁 Script executed:

#!/bin/bash
# Check if StreamError has MetastoreError variant specifically
rg -n "MetastoreError" src/handlers/http/logstream.rs | head -20

Repository: parseablehq/parseable

Length of output: 296


🏁 Script executed:

#!/bin/bash
# Also check the other functions mentioned at lines 105-118 and 311-319 for similar issues
cat -n src/handlers/http/modal/utils/logstream_utils.rs | sed -n '150,230p'

Repository: parseablehq/parseable

Length of output: 3343


Remove unnecessary error wrapping at line 272; preserve error structure through proper conversion.

The code wraps MetastoreError into ObjectStorageError::MetastoreError, which forces the error through LogstreamAffectedResourcesError::ObjectStorageError instead of LogstreamAffectedResourcesError::MetastoreError. The conversion impl then collapses ObjectStorageError and Bytes2JSONError to Anyhow(to_string()), losing all error structure.

Since StreamError has explicit variants for both MetastoreError and Storage(ObjectStorageError), the error chain should preserve these types:

Changes needed
  1. At line 268-275 (fetch_affected_roles), remove the .map_err() wrapping:
         let metadata_bytes = PARSEABLE
             .metastore
             .get_parseable_metadata()
             .await
-            .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?
+            .await?
             .ok_or_else(|| {
                 ObjectStorageError::Custom("parseable metadata not initialized".into())
             })?;
  1. At lines 311-319, fully specify all error variants in the conversion:
 impl From<LogstreamAffectedResourcesError> for StreamError {
     fn from(err: LogstreamAffectedResourcesError) -> Self {
         match err {
             LogstreamAffectedResourcesError::StreamNotFound(e) => StreamError::StreamNotFound(e),
             LogstreamAffectedResourcesError::MetastoreError(e) => StreamError::MetastoreError(e),
-            other => StreamError::Anyhow(anyhow::anyhow!(other.to_string())),
+            LogstreamAffectedResourcesError::ObjectStorageError(e) => StreamError::Storage(e),
+            LogstreamAffectedResourcesError::Bytes2JSONError(e) => StreamError::Anyhow(anyhow::anyhow!(e)),
         }
     }
 }
🤖 Prompt for AI Agents
In @src/handlers/http/modal/utils/logstream_utils.rs around lines 268 - 275, The
call to PARSEABLE.metastore.get_parseable_metadata() inside fetch_affected_roles
is wrapping the original Metastore error into ObjectStorageError::MetastoreError
(via .map_err(...)) which causes loss of the original error variant downstream;
remove that .map_err(...) and let the original MetastoreError propagate (use ?
directly) so StreamError::MetastoreError is preserved. Also update the
conversion/From impl(s) that collapse ObjectStorageError/Bytes2JSONError into
Anyhow — explicitly match all ObjectStorageError variants and map MetastoreError
to StreamError::MetastoreError while mapping genuine storage variants to
StreamError::Storage(ObjectStorageError::...), so error structure is preserved
(adjust the conversion at the impl that currently covers lines ~311-319).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: API for dependency graph

1 participant