Skip to content

Commit f501d85

Browse files
committed
feat: creating and removing function executors
1 parent b6a7866 commit f501d85

15 files changed

+761
-353
lines changed

Diff for: indexify/tests/cli/test_server_task_distribution.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ def success_func(sleep_secs: float) -> str:
3030
class TestServerTaskDistribution(unittest.TestCase):
3131
def test_server_distributes_invocations_fairly_between_two_executors(self):
3232
print(
33-
"Waiting for 10 seconds for Server to notice that any previously existing Executors exited."
33+
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
3434
)
35-
time.sleep(10)
35+
time.sleep(30)
3636

3737
with ExecutorProcessContextManager(
3838
[
@@ -81,9 +81,9 @@ def test_server_distributes_invocations_fairly_between_two_executors(self):
8181

8282
def test_server_redistributes_invocations_when_new_executor_joins(self):
8383
print(
84-
"Waiting for 10 seconds for Server to notice that any previously existing Executors exited."
84+
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
8585
)
86-
time.sleep(10)
86+
time.sleep(30)
8787

8888
graph = Graph(
8989
name=test_graph_name(self),

Diff for: server/data_model/src/lib.rs

+45-8
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,30 @@ pub struct FunctionURI {
12211221
pub version: Option<GraphVersion>,
12221222
}
12231223

1224+
impl FunctionURI {
1225+
pub fn matches(&self, other: &FunctionURI) -> bool {
1226+
self.namespace == other.namespace &&
1227+
self.compute_graph_name == other.compute_graph_name &&
1228+
self.compute_fn_name == other.compute_fn_name &&
1229+
(self.version.is_none() || self.version == other.version)
1230+
}
1231+
}
1232+
1233+
impl Display for FunctionURI {
1234+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1235+
write!(
1236+
f,
1237+
"{}|{}|{}|{}",
1238+
self.namespace,
1239+
self.compute_graph_name,
1240+
self.compute_fn_name,
1241+
self.version
1242+
.as_ref()
1243+
.map_or("None".to_string(), |v| v.to_string())
1244+
)
1245+
}
1246+
}
1247+
12241248
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12251249
pub struct GpuResources {
12261250
pub count: u32,
@@ -1315,13 +1339,13 @@ impl FunctionExecutor {
13151339
///
13161340
/// A match occurs when:
13171341
/// 1. The namespace, compute_graph_name, and compute_fn_name are equal
1318-
/// 2. Either the FunctionURI has no version specified (None), OR
1319-
/// the versions match
1342+
/// 2. Either the FunctionURI has no version specified (None), OR the
1343+
/// versions match
13201344
pub fn matches(&self, uri: &FunctionURI) -> bool {
13211345
// Check if namespace, compute_graph_name, and compute_fn_name match
1322-
let basic_match = self.namespace == uri.namespace
1323-
&& self.compute_graph_name == uri.compute_graph_name
1324-
&& self.compute_fn_name == uri.compute_fn_name;
1346+
let basic_match = self.namespace == uri.namespace &&
1347+
self.compute_graph_name == uri.compute_graph_name &&
1348+
self.compute_fn_name == uri.compute_fn_name;
13251349

13261350
// If the basic fields match, then check the version
13271351
if basic_match {
@@ -1331,6 +1355,13 @@ impl FunctionExecutor {
13311355
false
13321356
}
13331357
}
1358+
1359+
pub fn matches_fn(&self, other: &FunctionExecutor) -> bool {
1360+
self.namespace == other.namespace &&
1361+
self.compute_graph_name == other.compute_graph_name &&
1362+
self.compute_fn_name == other.compute_fn_name &&
1363+
self.version == other.version
1364+
}
13341365
}
13351366

13361367
impl FunctionExecutorBuilder {
@@ -1617,12 +1648,18 @@ pub struct Namespace {
16171648

16181649
#[cfg(test)]
16191650
mod tests {
1620-
use super::*;
16211651
use std::collections::HashMap;
16221652

1653+
use super::*;
16231654
use crate::{
1624-
test_objects::tests::test_compute_fn, ComputeGraph, ComputeGraphCode, ComputeGraphVersion,
1625-
DynamicEdgeRouter, GraphVersion, Node, RuntimeInformation,
1655+
test_objects::tests::test_compute_fn,
1656+
ComputeGraph,
1657+
ComputeGraphCode,
1658+
ComputeGraphVersion,
1659+
DynamicEdgeRouter,
1660+
GraphVersion,
1661+
Node,
1662+
RuntimeInformation,
16261663
};
16271664

16281665
#[test]

Diff for: server/data_model/src/test_objects.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,28 @@ pub mod tests {
44
use rand::{distributions::Alphanumeric, Rng};
55

66
use super::super::{
7-
ComputeFn, ComputeGraph, ComputeGraphCode, Node, Node::Compute, NodeOutput,
7+
ComputeFn,
8+
ComputeGraph,
9+
ComputeGraphCode,
10+
Node,
11+
Node::Compute,
12+
NodeOutput,
813
RuntimeInformation,
914
};
1015
use crate::{
11-
DataPayload, DynamicEdgeRouter, ExecutorId, ExecutorMetadata, GraphInvocationCtx,
12-
GraphInvocationCtxBuilder, GraphVersion, ImageInformation, InvocationPayload,
13-
InvocationPayloadBuilder, NodeOutputBuilder, Task, TaskBuilder,
16+
DataPayload,
17+
DynamicEdgeRouter,
18+
ExecutorId,
19+
ExecutorMetadata,
20+
GraphInvocationCtx,
21+
GraphInvocationCtxBuilder,
22+
GraphVersion,
23+
ImageInformation,
24+
InvocationPayload,
25+
InvocationPayloadBuilder,
26+
NodeOutputBuilder,
27+
Task,
28+
TaskBuilder,
1429
};
1530

1631
pub const TEST_NAMESPACE: &str = "test_ns";

Diff for: server/processor/src/graph_processor.rs

+23-43
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use metrics::{low_latency_boundaries, Timer};
77
use opentelemetry::{metrics::Histogram, KeyValue};
88
use state_store::{
99
requests::{
10-
DeleteComputeGraphRequest, DeleteInvocationRequest, RequestPayload,
10+
DeleteComputeGraphRequest,
11+
DeleteInvocationRequest,
12+
RequestPayload,
1113
StateMachineUpdateRequest,
1214
},
1315
IndexifyState,
@@ -207,67 +209,45 @@ impl GraphProcessor {
207209
) -> Result<StateMachineUpdateRequest> {
208210
debug!("processing state change: {}", state_change);
209211
let mut indexes = self.indexify_state.in_memory_state.read().await.clone();
210-
match &state_change.change_type {
212+
let req = match &state_change.change_type {
211213
ChangeType::InvokeComputeGraph(_) | ChangeType::TaskOutputsIngested(_) => {
212-
let scheduler_update = self
214+
let mut scheduler_update = self
213215
.task_creator
214216
.invoke(&state_change.change_type, &mut indexes)
215-
.await;
216-
if let Ok(mut result) = scheduler_update {
217-
let placement_result = self.task_allocator.allocate(&mut indexes)?;
218-
result
219-
.new_allocations
220-
.extend(placement_result.new_allocations);
221-
result.updated_tasks.extend(placement_result.updated_tasks);
222-
Ok(StateMachineUpdateRequest {
223-
payload: RequestPayload::SchedulerUpdate(Box::new(result)),
224-
processed_state_changes: vec![state_change.clone()],
225-
})
226-
} else {
227-
error!("error invoking task creator: {:?}", scheduler_update.err());
228-
Ok(StateMachineUpdateRequest {
229-
payload: RequestPayload::Noop,
230-
processed_state_changes: vec![state_change.clone()],
231-
})
217+
.await?;
218+
scheduler_update.extend(self.task_allocator.allocate(&mut indexes)?);
219+
StateMachineUpdateRequest {
220+
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
221+
processed_state_changes: vec![state_change.clone()],
232222
}
233223
}
234-
ChangeType::ExecutorUpserted(_)
235-
| ChangeType::ExecutorRemoved(_)
236-
| ChangeType::TombStoneExecutor(_) => {
224+
ChangeType::ExecutorUpserted(_) |
225+
ChangeType::ExecutorRemoved(_) |
226+
ChangeType::TombStoneExecutor(_) => {
237227
let scheduler_update = self
238228
.task_allocator
239-
.invoke(&state_change.change_type, &mut indexes);
240-
if let Ok(result) = scheduler_update {
241-
Ok(StateMachineUpdateRequest {
242-
payload: RequestPayload::SchedulerUpdate(Box::new(result)),
243-
processed_state_changes: vec![state_change.clone()],
244-
})
245-
} else {
246-
error!(
247-
"error invoking task allocator: {:?}",
248-
scheduler_update.err()
249-
);
250-
Ok(StateMachineUpdateRequest {
251-
payload: RequestPayload::Noop,
252-
processed_state_changes: vec![state_change.clone()],
253-
})
229+
.invoke(&state_change.change_type, &mut indexes)?;
230+
StateMachineUpdateRequest {
231+
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
232+
processed_state_changes: vec![state_change.clone()],
254233
}
255234
}
256-
ChangeType::TombstoneComputeGraph(request) => Ok(StateMachineUpdateRequest {
235+
ChangeType::TombstoneComputeGraph(request) => StateMachineUpdateRequest {
257236
payload: RequestPayload::DeleteComputeGraphRequest(DeleteComputeGraphRequest {
258237
namespace: request.namespace.clone(),
259238
name: request.compute_graph.clone(),
260239
}),
261240
processed_state_changes: vec![state_change.clone()],
262-
}),
263-
ChangeType::TombstoneInvocation(request) => Ok(StateMachineUpdateRequest {
241+
},
242+
ChangeType::TombstoneInvocation(request) => StateMachineUpdateRequest {
264243
payload: RequestPayload::DeleteInvocationRequest(DeleteInvocationRequest {
265244
namespace: request.namespace.clone(),
266245
compute_graph: request.compute_graph.clone(),
267246
invocation_id: request.invocation_id.clone(),
268247
}),
269248
processed_state_changes: vec![state_change.clone()],
270-
}),
271-
}
249+
},
250+
};
251+
Ok(req)
272252
}
273253
}

0 commit comments

Comments
 (0)