Skip to content

Commit 67358b8

Browse files
committed
working sketch
1 parent 2847a66 commit 67358b8

File tree

8 files changed

+2275
-0
lines changed

8 files changed

+2275
-0
lines changed

Cargo.lock

Lines changed: 1805 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,13 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
8+
kube = { version = "0.88", default-features = true, features = ["derive", "runtime"] }
9+
k8s-openapi = { version = "0.21", default-features = false, features = ["v1_28"] }
10+
futures = "0.3"
11+
serde = "1"
12+
serde_json = "1"
13+
schemars = "0.8"
14+
anyhow = "1"
15+
thiserror = "1"
16+
log = "0.4"

crd.yml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
---
2+
apiVersion: apiextensions.k8s.io/v1
3+
kind: CustomResourceDefinition
4+
metadata:
5+
name: citusclusters.jw3.xyz
6+
spec:
7+
scope: Namespaced
8+
names:
9+
kind: CitusCluster
10+
plural: citusclusters
11+
singular: cituscluster
12+
shortNames:
13+
- cc
14+
group: jw3.xyz
15+
versions:
16+
- name: v1alpha1
17+
served: true
18+
storage: true
19+
schema:
20+
openAPIV3Schema:
21+
type: object
22+
properties:
23+
apiVersion:
24+
type: string
25+
pattern: ^jw3.xyz/v1alpha1$
26+
kind:
27+
type: string
28+
pattern: ^CitusCluster$
29+
spec:
30+
type: object
31+
properties:
32+
name:
33+
type: string
34+
workers:
35+
type: integer

deploy.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
apiVersion: jw3.xyz/v1alpha1
3+
kind: CitusCluster
4+
metadata:
5+
name: my-citus-cluster
6+
spec:
7+
workers: 2

readme.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,15 @@ example citus operator
22
===
33

44
Using the Kube crate to manage a Citus Postgres cluster
5+
6+
## usage
7+
8+
1. install the crd `k apply -f crd.yml`
9+
2. deploy a cluster `k apply -f deploy.yml`
10+
11+
## reference
12+
13+
- https://github.com/Pscheidl/rust-kubernetes-operator-example
14+
- https://docs.citusdata.com/en/v6.0/index.html
15+
- https://github.com/citusdata/membership-manager/blob/master/manager.py
16+
- https://github.com/citusdata/docker

src/cluster.rs

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
use std::collections::BTreeMap;
2+
3+
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet, StatefulSetSpec};
4+
use k8s_openapi::api::batch::v1::{Job, JobSpec, JobTemplateSpec};
5+
use k8s_openapi::api::core::v1::{
6+
Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
7+
};
8+
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
9+
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
10+
use kube::{Api, Client, Error};
11+
use kube::api::{DeleteParams, ObjectMeta, Patch, PatchParams, PostParams};
12+
use serde_json::{json, Value};
13+
14+
use crate::crd::CitusCluster;
15+
16+
pub type CitusDeployment = (Deployment, StatefulSet);
17+
18+
pub async fn deploy(
19+
client: Client,
20+
name: &str,
21+
workers: i32,
22+
namespace: &str,
23+
) -> Result<CitusDeployment, Error> {
24+
let mut worker_labels: BTreeMap<String, String> = BTreeMap::new();
25+
worker_labels.insert("app".to_owned(), name.to_owned());
26+
27+
let ss: StatefulSet = StatefulSet {
28+
metadata: ObjectMeta {
29+
name: Some(format!("{name}-workers")),
30+
namespace: Some(namespace.to_owned()),
31+
labels: Some(worker_labels.clone()),
32+
..ObjectMeta::default()
33+
},
34+
spec: Some(StatefulSetSpec {
35+
service_name: format!("{name}-worker"),
36+
replicas: Some(workers),
37+
selector: LabelSelector {
38+
match_expressions: None,
39+
match_labels: Some(worker_labels.clone()),
40+
},
41+
template: PodTemplateSpec {
42+
spec: Some(PodSpec {
43+
containers: vec![Container {
44+
name: "worker".to_owned(),
45+
image: Some("citusdata/citus:12.1".to_owned()),
46+
image_pull_policy: Some("IfNotPresent".to_owned()),
47+
ports: Some(vec![ContainerPort {
48+
container_port: 5432,
49+
..ContainerPort::default()
50+
}]),
51+
env: Some(vec![EnvVar {
52+
name: "POSTGRES_PASSWORD".to_owned(),
53+
value: Some("yourpassword".to_owned()),
54+
..EnvVar::default()
55+
}]),
56+
..Container::default()
57+
}],
58+
..PodSpec::default()
59+
}),
60+
metadata: Some(ObjectMeta {
61+
labels: Some(worker_labels.clone()),
62+
..ObjectMeta::default()
63+
}),
64+
},
65+
..StatefulSetSpec::default()
66+
}),
67+
..StatefulSet::default()
68+
};
69+
70+
let mut master_labels: BTreeMap<String, String> = BTreeMap::new();
71+
master_labels.insert("app".to_owned(), name.to_owned());
72+
master_labels.insert("node".to_owned(), "master".to_owned());
73+
74+
let ss_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
75+
let ss = ss_api.create(&PostParams::default(), &ss).await?;
76+
77+
let deployment: Deployment = Deployment {
78+
metadata: ObjectMeta {
79+
name: Some(format!("{name}-master")),
80+
namespace: Some(namespace.to_owned()),
81+
labels: Some(master_labels.clone()),
82+
..ObjectMeta::default()
83+
},
84+
spec: Some(DeploymentSpec {
85+
replicas: Some(1),
86+
selector: LabelSelector {
87+
match_expressions: None,
88+
match_labels: Some(master_labels.clone()),
89+
},
90+
template: PodTemplateSpec {
91+
spec: Some(PodSpec {
92+
containers: vec![Container {
93+
name: format!("{name}-master"),
94+
image: Some("citusdata/citus:12.1".to_owned()),
95+
image_pull_policy: Some("IfNotPresent".to_owned()),
96+
ports: Some(vec![ContainerPort {
97+
container_port: 5432,
98+
..ContainerPort::default()
99+
}]),
100+
env: Some(vec![EnvVar {
101+
name: "POSTGRES_PASSWORD".to_owned(),
102+
value: Some("yourpassword".to_owned()),
103+
..EnvVar::default()
104+
}]),
105+
..Container::default()
106+
}],
107+
..PodSpec::default()
108+
}),
109+
metadata: Some(ObjectMeta {
110+
labels: Some(master_labels.clone()),
111+
..ObjectMeta::default()
112+
}),
113+
},
114+
..DeploymentSpec::default()
115+
}),
116+
..Deployment::default()
117+
};
118+
119+
// Create the deployment defined above
120+
let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
121+
let m = deployment_api
122+
.create(&PostParams::default(), &deployment)
123+
.await?;
124+
125+
let init_job = Job {
126+
metadata: ObjectMeta {
127+
generate_name: Some("working-init-".to_owned()),
128+
..ObjectMeta::default()
129+
},
130+
spec: Some(JobSpec {
131+
template: PodTemplateSpec {
132+
spec: Some(PodSpec {
133+
restart_policy: Some("OnFailure".to_owned()),
134+
containers: vec![Container {
135+
name: format!("{name}-init-worker"),
136+
image: Some("citusdata/citus:12.1".to_owned()),
137+
command: Some(vec![
138+
"bash".to_owned(),
139+
"-c".to_owned(),
140+
format!("psql -c \"{}\"", (0..workers)
141+
.map(|i| {
142+
format!(
143+
r#"SELECT * from master_add_node('my-citus-cluster-workers-{i}.my-citus-cluster-worker', 5432)"#
144+
)
145+
})
146+
.collect::<Vec<_>>()
147+
.join(";")),
148+
]),
149+
image_pull_policy: Some("IfNotPresent".to_owned()),
150+
env: Some(vec![
151+
EnvVar {
152+
name: "PGHOST".to_owned(),
153+
value: Some(format!("{name}")),
154+
..EnvVar::default()
155+
},
156+
EnvVar {
157+
name: "PGUSER".to_owned(),
158+
value: Some("postgres".to_owned()),
159+
..EnvVar::default()
160+
},
161+
EnvVar {
162+
name: "PGPASSWORD".to_owned(),
163+
value: Some("yourpassword".to_owned()),
164+
..EnvVar::default()
165+
},
166+
]),
167+
..Container::default()
168+
}],
169+
..PodSpec::default()
170+
}),
171+
..PodTemplateSpec::default()
172+
},
173+
..JobSpec::default()
174+
}),
175+
..Job::default()
176+
};
177+
178+
let jobs_api: Api<Job> = Api::namespaced(client.clone(), namespace);
179+
let job = jobs_api.create(&PostParams::default(), &init_job).await?;
180+
181+
let mut master_selector_labels: BTreeMap<String, String> = BTreeMap::new();
182+
master_selector_labels.insert("node".to_owned(), "master".to_owned());
183+
184+
let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
185+
186+
let headless_svc = Service {
187+
metadata: ObjectMeta {
188+
name: Some(format!("{name}-worker")),
189+
namespace: Some(namespace.to_owned()),
190+
labels: Some(worker_labels.clone()),
191+
..ObjectMeta::default()
192+
},
193+
spec: Some(ServiceSpec {
194+
ports: Some(vec![ServicePort {
195+
name: Some("pg".to_owned()),
196+
port: 5432,
197+
target_port: Some(IntOrString::Int(5432)),
198+
..ServicePort::default()
199+
}]),
200+
selector: Some(worker_labels),
201+
cluster_ip: None,
202+
..ServiceSpec::default()
203+
}),
204+
..Service::default()
205+
};
206+
207+
service_api
208+
.create(&PostParams::default(), &headless_svc)
209+
.await?;
210+
211+
let svc = Service {
212+
metadata: ObjectMeta {
213+
name: Some(format!("{name}")),
214+
namespace: Some(namespace.to_owned()),
215+
labels: Some(master_labels.clone()),
216+
..ObjectMeta::default()
217+
},
218+
spec: Some(ServiceSpec {
219+
ports: Some(vec![ServicePort {
220+
name: Some("pg".to_owned()),
221+
port: 5432,
222+
target_port: Some(IntOrString::Int(5432)),
223+
..ServicePort::default()
224+
}]),
225+
selector: Some(master_selector_labels),
226+
..ServiceSpec::default()
227+
}),
228+
..Service::default()
229+
};
230+
service_api.create(&PostParams::default(), &svc).await?;
231+
232+
Ok((m, ss))
233+
}
234+
235+
pub async fn delete(client: Client, name: &str, namespace: &str) -> Result<(), Error> {
236+
let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
237+
deployment_api
238+
.delete(&format!("{name}-master"), &DeleteParams::default())
239+
.await?;
240+
241+
let api: Api<StatefulSet> = Api::namespaced(client, namespace);
242+
api.delete(&format!("{name}-workers"), &DeleteParams::default())
243+
.await?;
244+
245+
Ok(())
246+
}
247+
248+
pub async fn add_finalizer(
249+
client: Client,
250+
name: &str,
251+
namespace: &str,
252+
) -> Result<CitusCluster, Error> {
253+
let api: Api<CitusCluster> = Api::namespaced(client, namespace);
254+
let finalizer: Value = json!({
255+
"metadata": {
256+
"finalizers": ["citusclusters.jw3.xyz/finalizer"]
257+
}
258+
});
259+
260+
let patch: Patch<&Value> = Patch::Merge(&finalizer);
261+
api.patch(name, &PatchParams::default(), &patch).await
262+
}
263+
264+
pub async fn delete_finalizer(
265+
client: Client,
266+
name: &str,
267+
namespace: &str,
268+
) -> Result<CitusCluster, Error> {
269+
let api: Api<CitusCluster> = Api::namespaced(client, namespace);
270+
let finalizer: Value = json!({
271+
"metadata": {
272+
"finalizers": null
273+
}
274+
});
275+
let patch: Patch<&Value> = Patch::Merge(&finalizer);
276+
api.patch(name, &PatchParams::default(), &patch).await
277+
}

src/crd.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use kube::CustomResource;
2+
use schemars::JsonSchema;
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(CustomResource, Serialize, Deserialize, Debug, PartialEq, Clone, JsonSchema)]
6+
#[kube(
7+
group = "jw3.xyz",
8+
version = "v1alpha1",
9+
kind = "CitusCluster",
10+
plural = "citusclusters",
11+
derive = "PartialEq",
12+
namespaced
13+
)]
14+
pub struct CitusClusterSpec {
15+
pub workers: i32,
16+
}

0 commit comments

Comments
 (0)