Skip to content

[hud][ch][drci] add api to cache ch queries + cache issues query #6578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tools/scripts/fetch_latest_green_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def get_commit_results(

@lru_cache
def fetch_unstable_issues() -> List[str]:
issues = query_clickhouse_saved("issue_query", {"label": "unstable"})
issues = query_clickhouse_saved(
"issue_query", {"label": "unstable"}, useChQueryCache=True
)
return [
issue["title"][len("UNSTABLE") :].strip()
for issue in issues
Expand Down
39 changes: 30 additions & 9 deletions tools/torchci/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import os
from functools import lru_cache
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import clickhouse_connect
from clickhouse_connect.driver import Client
from torchci.utils import cache_json, REPO_ROOT


@lru_cache(maxsize=1)
def get_clickhouse_client() -> Any:
def get_clickhouse_client() -> Client:
endpoint = os.environ["CLICKHOUSE_ENDPOINT"]
# I cannot figure out why these values aren't being handled automatically
# when it is fine in the lambda
Expand All @@ -26,23 +27,37 @@ def get_clickhouse_client() -> Any:
)


def query_clickhouse_saved(queryName: str, inputParams: Dict[str, Any]) -> Any:
def query_clickhouse_saved(
queryName: str, inputParams: Dict[str, Any], useChQueryCache=False
) -> Any:
"""
Queries ClickHouse using a saved query file and parameters.
:param useChQueryCache: If True, caches the query result on ClickHouse side (1 minute TTL).
:return:
"""
path = REPO_ROOT / "torchci" / "clickhouse_queries" / queryName
with open(path / "query.sql") as f:
queryText = f.read()
with open(path / "params.json") as f:
paramsText = json.load(f).get("params", {})

queryParams = {name: inputParams[name] for name in paramsText}
return query_clickhouse(queryText, queryParams)
return query_clickhouse(queryText, queryParams, use_ch_query_cache=useChQueryCache)


def query_clickhouse(
query: str, params: Dict[str, Any], use_cache: bool = False
query: str,
params: Dict[str, Any],
use_cache: bool = False,
use_ch_query_cache=False,
) -> Any:
"""
Queries ClickHouse. Returns datetime in YYYY-MM-DD HH:MM:SS format.
:param use_ch_query_cache: If True, uses ClickHouse's query cache (1 minute TTL).
"""
settings = None
if use_ch_query_cache:
settings = {"use_query_cache": 1}

def convert_to_json_list(res: str) -> List[Dict[str, Any]]:
rows = []
Expand All @@ -52,13 +67,19 @@ def convert_to_json_list(res: str) -> List[Dict[str, Any]]:
return rows

if not use_cache:
res = get_clickhouse_client().raw_query(query, params, fmt="JSONEachRow")
res = get_clickhouse_client().raw_query(
query, params, settings=settings, fmt="JSONEachRow"
)
return convert_to_json_list(res)
else:

@cache_json
def cache_query_clickhouse(query, params):
res = get_clickhouse_client().raw_query(query, params, fmt="JSONEachRow")
def cache_query_clickhouse(
query, params, settings: Optional[Dict[str, Any]] = None
) -> Any:
res = get_clickhouse_client().raw_query(
query, params, settings=settings, fmt="JSONEachRow"
)
return convert_to_json_list(res)

return cache_query_clickhouse(query, params)
return cache_query_clickhouse(query, params, settings)
194 changes: 194 additions & 0 deletions tools/torchci/tests/test_ch_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import os
import time
import unittest

from dotenv import load_dotenv
from torchci.clickhouse import (
get_clickhouse_client,
query_clickhouse,
query_clickhouse_saved,
)


# This test is intended to run locally against a real ClickHouse instance
# Provide the necessary environment variables (e.g., in a .env file)
class TestClickhouseQueries(unittest.TestCase):
@classmethod
def setUpClass(cls):
load_dotenv()
# Check if ClickHouse credentials are available
cls.can_run = all(
env_var in os.environ
for env_var in [
"CLICKHOUSE_ENDPOINT",
"CLICKHOUSE_USERNAME",
"CLICKHOUSE_PASSWORD",
]
)
if not cls.can_run:
print("Skipping ClickHouse tests: required environment variables not set")
else:
# Test connection before running tests
try:
client = get_clickhouse_client()
# Simple query to check connection
client.query("SELECT 1")
cls.can_run = True
except Exception as e:
print(f"ClickHouse connection failed: {e}")
cls.can_run = False

def setUp(self):
"""Skip tests if ClickHouse is not available"""
if not self.can_run:
self.skipTest(
"ClickHouse environment variables not set or connection failed"
)

def test_simple_query_no_cache(self):
"""Test a simple SELECT 1 query without cache"""
query = "SELECT 1 AS value"
results = query_clickhouse(query, {}, use_cache=False)

self.assertIsInstance(results, list)
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["value"], 1)

def test_simple_query_with_cache(self):
"""Test a simple SELECT 1 query with cache"""
query = "SELECT 1 AS value"

# First call should hit database
start_time = time.time()
results1 = query_clickhouse(query, {}, use_cache=True)
first_call_time = time.time() - start_time

# Second call should use cache
start_time = time.time()
results2 = query_clickhouse(query, {}, use_cache=True)
second_call_time = time.time() - start_time

# Both should return same result
self.assertEqual(results1, results2)
self.assertEqual(results1[0]["value"], 1)

# Second call should be faster or similar (allowing for measurement noise)
# We don't assert on exact timing as it depends on many factors
print(
f"First call: {first_call_time:.6f}s, Second call: {second_call_time:.6f}s"
)

def test_simple_query_with_clickhouse_cache(self):
"""Test a simple query with ClickHouse's query cache"""
query = "SELECT 1 AS value"

# First call
results1 = query_clickhouse(query, {}, use_ch_query_cache=True)

# Second call
results2 = query_clickhouse(query, {}, use_ch_query_cache=True)

# Both should return same result
self.assertEqual(results1, results2)
self.assertEqual(results1[0]["value"], 1)

def test_parameterized_query(self):
"""Test a query with parameters"""
query = "SELECT {value:UInt8} AS value"
params = {"value": 42}

results = query_clickhouse(query, params)
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["value"], 42)

def test_saved_query(self):
"""Test using a saved query (issue_query)"""
try:
results = query_clickhouse_saved("issue_query", {"label": "flaky"})
self.assertIsInstance(results, list)

# Check structure of results based on query.sql
if results:
expected_columns = [
"number",
"title",
"html_url",
"state",
"body",
"updated_at",
"author_association",
"labels",
]
for col in expected_columns:
self.assertIn(col, results[0], f"Missing expected column: {col}")
except Exception as e:
self.fail(f"Saved query test failed with: {e}")

def test_saved_query_with_cache(self):
"""Test saved query with cache"""
params = {"label": "bug"}

# First call with timing
start_time = time.time()
results1 = query_clickhouse_saved("issue_query", params, useChQueryCache=True)
first_call_time = time.time() - start_time

# Second call with timing
start_time = time.time()
results2 = query_clickhouse_saved("issue_query", params, useChQueryCache=True)
second_call_time = time.time() - start_time

# Print timing information
print(
f"Saved query - First call: {first_call_time:.6f}s, Second call: {second_call_time:.6f}s"
)
print(
f"Speedup ratio: {first_call_time/second_call_time if second_call_time > 0 else 'inf':.2f}x"
)

# Both should return same data structure
self.assertEqual(type(results1), type(results2))

# Verify the results are identical (same number of rows)
self.assertEqual(
len(results1),
len(results2),
"Cached query returned different number of results",
)

# If we got results, check they match expected structure based on query.sql
if results1:
expected_columns = [
"number",
"title",
"html_url",
"state",
"body",
"updated_at",
"author_association",
"labels",
]
for col in expected_columns:
self.assertIn(col, results1[0], f"Missing expected column: {col}")

# Verify the labels array contains the search parameter
if results1[0]["labels"]:
# At least one issue should have the label we searched for
found_label = False
for issue in results1:
if any(label == params["label"] for label in issue["labels"]):
found_label = True
break
self.assertTrue(
found_label,
f"Couldn't find any issue with label '{params['label']}'",
)
else:
# If there are no labels, the test will pass but we'll print a warning
print(
"Warning: No labels found in results, can't verify label filtering"
)


if __name__ == "__main__":
unittest.main()
12 changes: 9 additions & 3 deletions torchci/lib/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export function getClickhouseClientWritable() {
export async function queryClickhouse(
query: string,
params: Record<string, unknown>,
query_id?: string
query_id?: string,
useQueryCache?: boolean
): Promise<any[]> {
if (query_id === undefined) {
query_id = "adhoc";
Expand All @@ -41,6 +42,7 @@ export async function queryClickhouse(
* queryClickhouse
* @param query: string, the sql query
* @param params: Record<string, unknown>, the parameters to the query ex { sha: "abcd" }
* @param useQueryCache: boolean, if true, cache the query result on Ch side (1 minute TTL)
*/
const clickhouseClient = getClickhouseClient();

Expand All @@ -51,6 +53,7 @@ export async function queryClickhouse(
clickhouse_settings: {
output_format_json_quote_64bit_integers: 0,
date_time_output_format: "iso",
use_query_cache: useQueryCache ? 1 : 0,
},
query_id,
});
Expand All @@ -60,12 +63,14 @@ export async function queryClickhouse(

export async function queryClickhouseSaved(
queryName: string,
inputParams: Record<string, unknown>
inputParams: Record<string, unknown>,
useQueryCache?: boolean
) {
/**
* queryClickhouseSaved
* @param queryName: string, the name of the query, which is the name of the folder in clickhouse_queries
* @param inputParams: Record<string, unknown>, the parameters to the query, an object where keys are the parameter names
* @param useQueryCache: boolean, if true, cache the query result on Ch side (1 minute TTL)
*
* This function will filter the inputParams to only include the parameters
* that are in the query params json file.
Expand All @@ -90,6 +95,7 @@ export async function queryClickhouseSaved(
return await thisModule.queryClickhouse(
query,
Object.fromEntries(queryParams),
queryName
queryName,
useQueryCache
);
}
4 changes: 3 additions & 1 deletion torchci/lib/drciUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ export async function upsertDrCiComment(
);
const existingDrciID = existingDrciData.id;
const existingDrciComment = existingDrciData.body;
const sev = getActiveSEVs(await fetchIssuesByLabel("ci: sev"));
const sev = getActiveSEVs(
await fetchIssuesByLabel("ci: sev", /*cache*/ true)
);
const drciComment = formDrciComment(
prNum,
owner,
Expand Down
2 changes: 1 addition & 1 deletion torchci/lib/fetchHud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export default async function fetchHud(
results = results?.filter((job: JobData) => !isRerunDisabledTestsJob(job));
}
if (params.filter_unstable) {
const unstableIssues = await fetchIssuesByLabel("unstable");
const unstableIssues = await fetchIssuesByLabel("unstable", /*cache*/ true);
results = results?.filter(
(job: JobData) => !isUnstableJob(job, unstableIssues ?? [])
);
Expand Down
13 changes: 9 additions & 4 deletions torchci/lib/fetchIssuesByLabel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ import { queryClickhouseSaved } from "./clickhouse";
import { IssueData } from "./types";

export default async function fetchIssuesByLabel(
label: string
label: string,
useChCache?: boolean
): Promise<IssueData[]> {
return await queryClickhouseSaved("issue_query", {
label,
});
return await queryClickhouseSaved(
"issue_query",
{
label,
},
useChCache
);
}
2 changes: 1 addition & 1 deletion torchci/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"@aws-sdk/client-dynamodb": "^3.347.1",
"@aws-sdk/client-s3": "^3.347.1",
"@aws-sdk/lib-dynamodb": "^3.72.0",
"@clickhouse/client": "^0.1.0",
"@clickhouse/client": "^1.11.1",
"@codemirror/basic-setup": "^0.20.0",
"@codemirror/state": "^0.20.0",
"@codemirror/theme-one-dark": "^0.20.0",
Expand Down
Loading