Skip to content

Commit 756627a

Browse files
committed
feat: creating and removing function executors
1 parent b6a7866 commit 756627a

14 files changed

+752
-347
lines changed

Diff for: server/data_model/src/lib.rs

+30-8
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,15 @@ pub struct FunctionURI {
12211221
pub version: Option<GraphVersion>,
12221222
}
12231223

1224+
impl FunctionURI {
1225+
pub fn matches(&self, other: &FunctionURI) -> bool {
1226+
self.namespace == other.namespace &&
1227+
self.compute_graph_name == other.compute_graph_name &&
1228+
self.compute_fn_name == other.compute_fn_name &&
1229+
(self.version.is_none() || self.version == other.version)
1230+
}
1231+
}
1232+
12241233
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12251234
pub struct GpuResources {
12261235
pub count: u32,
@@ -1315,13 +1324,13 @@ impl FunctionExecutor {
13151324
///
13161325
/// A match occurs when:
13171326
/// 1. The namespace, compute_graph_name, and compute_fn_name are equal
1318-
/// 2. Either the FunctionURI has no version specified (None), OR
1319-
/// the versions match
1327+
/// 2. Either the FunctionURI has no version specified (None), OR the
1328+
/// versions match
13201329
pub fn matches(&self, uri: &FunctionURI) -> bool {
13211330
// Check if namespace, compute_graph_name, and compute_fn_name match
1322-
let basic_match = self.namespace == uri.namespace
1323-
&& self.compute_graph_name == uri.compute_graph_name
1324-
&& self.compute_fn_name == uri.compute_fn_name;
1331+
let basic_match = self.namespace == uri.namespace &&
1332+
self.compute_graph_name == uri.compute_graph_name &&
1333+
self.compute_fn_name == uri.compute_fn_name;
13251334

13261335
// If the basic fields match, then check the version
13271336
if basic_match {
@@ -1331,6 +1340,13 @@ impl FunctionExecutor {
13311340
false
13321341
}
13331342
}
1343+
1344+
pub fn matches_fn(&self, other: &FunctionExecutor) -> bool {
1345+
self.namespace == other.namespace &&
1346+
self.compute_graph_name == other.compute_graph_name &&
1347+
self.compute_fn_name == other.compute_fn_name &&
1348+
self.version == other.version
1349+
}
13341350
}
13351351

13361352
impl FunctionExecutorBuilder {
@@ -1617,12 +1633,18 @@ pub struct Namespace {
16171633

16181634
#[cfg(test)]
16191635
mod tests {
1620-
use super::*;
16211636
use std::collections::HashMap;
16221637

1638+
use super::*;
16231639
use crate::{
1624-
test_objects::tests::test_compute_fn, ComputeGraph, ComputeGraphCode, ComputeGraphVersion,
1625-
DynamicEdgeRouter, GraphVersion, Node, RuntimeInformation,
1640+
test_objects::tests::test_compute_fn,
1641+
ComputeGraph,
1642+
ComputeGraphCode,
1643+
ComputeGraphVersion,
1644+
DynamicEdgeRouter,
1645+
GraphVersion,
1646+
Node,
1647+
RuntimeInformation,
16261648
};
16271649

16281650
#[test]

Diff for: server/data_model/src/test_objects.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,28 @@ pub mod tests {
44
use rand::{distributions::Alphanumeric, Rng};
55

66
use super::super::{
7-
ComputeFn, ComputeGraph, ComputeGraphCode, Node, Node::Compute, NodeOutput,
7+
ComputeFn,
8+
ComputeGraph,
9+
ComputeGraphCode,
10+
Node,
11+
Node::Compute,
12+
NodeOutput,
813
RuntimeInformation,
914
};
1015
use crate::{
11-
DataPayload, DynamicEdgeRouter, ExecutorId, ExecutorMetadata, GraphInvocationCtx,
12-
GraphInvocationCtxBuilder, GraphVersion, ImageInformation, InvocationPayload,
13-
InvocationPayloadBuilder, NodeOutputBuilder, Task, TaskBuilder,
16+
DataPayload,
17+
DynamicEdgeRouter,
18+
ExecutorId,
19+
ExecutorMetadata,
20+
GraphInvocationCtx,
21+
GraphInvocationCtxBuilder,
22+
GraphVersion,
23+
ImageInformation,
24+
InvocationPayload,
25+
InvocationPayloadBuilder,
26+
NodeOutputBuilder,
27+
Task,
28+
TaskBuilder,
1429
};
1530

1631
pub const TEST_NAMESPACE: &str = "test_ns";

Diff for: server/processor/src/graph_processor.rs

+23-43
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use metrics::{low_latency_boundaries, Timer};
77
use opentelemetry::{metrics::Histogram, KeyValue};
88
use state_store::{
99
requests::{
10-
DeleteComputeGraphRequest, DeleteInvocationRequest, RequestPayload,
10+
DeleteComputeGraphRequest,
11+
DeleteInvocationRequest,
12+
RequestPayload,
1113
StateMachineUpdateRequest,
1214
},
1315
IndexifyState,
@@ -207,67 +209,45 @@ impl GraphProcessor {
207209
) -> Result<StateMachineUpdateRequest> {
208210
debug!("processing state change: {}", state_change);
209211
let mut indexes = self.indexify_state.in_memory_state.read().await.clone();
210-
match &state_change.change_type {
212+
let req = match &state_change.change_type {
211213
ChangeType::InvokeComputeGraph(_) | ChangeType::TaskOutputsIngested(_) => {
212-
let scheduler_update = self
214+
let mut scheduler_update = self
213215
.task_creator
214216
.invoke(&state_change.change_type, &mut indexes)
215-
.await;
216-
if let Ok(mut result) = scheduler_update {
217-
let placement_result = self.task_allocator.allocate(&mut indexes)?;
218-
result
219-
.new_allocations
220-
.extend(placement_result.new_allocations);
221-
result.updated_tasks.extend(placement_result.updated_tasks);
222-
Ok(StateMachineUpdateRequest {
223-
payload: RequestPayload::SchedulerUpdate(Box::new(result)),
224-
processed_state_changes: vec![state_change.clone()],
225-
})
226-
} else {
227-
error!("error invoking task creator: {:?}", scheduler_update.err());
228-
Ok(StateMachineUpdateRequest {
229-
payload: RequestPayload::Noop,
230-
processed_state_changes: vec![state_change.clone()],
231-
})
217+
.await?;
218+
scheduler_update.extend(self.task_allocator.allocate(&mut indexes)?);
219+
StateMachineUpdateRequest {
220+
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
221+
processed_state_changes: vec![state_change.clone()],
232222
}
233223
}
234-
ChangeType::ExecutorUpserted(_)
235-
| ChangeType::ExecutorRemoved(_)
236-
| ChangeType::TombStoneExecutor(_) => {
224+
ChangeType::ExecutorUpserted(_) |
225+
ChangeType::ExecutorRemoved(_) |
226+
ChangeType::TombStoneExecutor(_) => {
237227
let scheduler_update = self
238228
.task_allocator
239-
.invoke(&state_change.change_type, &mut indexes);
240-
if let Ok(result) = scheduler_update {
241-
Ok(StateMachineUpdateRequest {
242-
payload: RequestPayload::SchedulerUpdate(Box::new(result)),
243-
processed_state_changes: vec![state_change.clone()],
244-
})
245-
} else {
246-
error!(
247-
"error invoking task allocator: {:?}",
248-
scheduler_update.err()
249-
);
250-
Ok(StateMachineUpdateRequest {
251-
payload: RequestPayload::Noop,
252-
processed_state_changes: vec![state_change.clone()],
253-
})
229+
.invoke(&state_change.change_type, &mut indexes)?;
230+
StateMachineUpdateRequest {
231+
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
232+
processed_state_changes: vec![state_change.clone()],
254233
}
255234
}
256-
ChangeType::TombstoneComputeGraph(request) => Ok(StateMachineUpdateRequest {
235+
ChangeType::TombstoneComputeGraph(request) => StateMachineUpdateRequest {
257236
payload: RequestPayload::DeleteComputeGraphRequest(DeleteComputeGraphRequest {
258237
namespace: request.namespace.clone(),
259238
name: request.compute_graph.clone(),
260239
}),
261240
processed_state_changes: vec![state_change.clone()],
262-
}),
263-
ChangeType::TombstoneInvocation(request) => Ok(StateMachineUpdateRequest {
241+
},
242+
ChangeType::TombstoneInvocation(request) => StateMachineUpdateRequest {
264243
payload: RequestPayload::DeleteInvocationRequest(DeleteInvocationRequest {
265244
namespace: request.namespace.clone(),
266245
compute_graph: request.compute_graph.clone(),
267246
invocation_id: request.invocation_id.clone(),
268247
}),
269248
processed_state_changes: vec![state_change.clone()],
270-
}),
271-
}
249+
},
250+
};
251+
Ok(req)
272252
}
273253
}

0 commit comments

Comments
 (0)