Skip to content

Commit a24a646

Browse files
committed
pvc storage
1 parent e167faa commit a24a646

File tree

9 files changed

+83
-9
lines changed

9 files changed

+83
-9
lines changed

crd.yml

+2
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ spec:
3333
type: string
3434
workers:
3535
type: integer
36+
worker_storage:
37+
type: integer

src/bin/cli.rs

+16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use kube::{Api, Client};
44
use kube::api::PostParams;
55

66
use example_citus_operator::crd::{CitusCluster, CitusClusterSpec};
7+
use example_citus_operator::storage;
78

89
#[derive(Clone, Debug, Parser)]
910
struct Opts {
@@ -28,12 +29,20 @@ struct CreateOpts {
2829
/// Number of workers
2930
#[clap(short, long, default_value = "1")]
3031
workers: usize,
32+
33+
/// Storage volume size for each worker, in GB
34+
#[clap(long, default_value = "1")]
35+
worker_storage: usize,
3136
}
3237

3338
#[derive(Clone, Debug, Parser)]
3439
struct DeleteOpts {
3540
/// Name of the cluster
3641
name: String,
42+
43+
/// Delete associated persistent storage
44+
#[clap(long)]
45+
purge: bool,
3746
}
3847

3948
#[tokio::main]
@@ -55,6 +64,7 @@ async fn main() {
5564
},
5665
spec: CitusClusterSpec {
5766
workers: c.workers as i32,
67+
worker_storage: c.worker_storage,
5868
},
5969
},
6070
)
@@ -66,6 +76,12 @@ async fn main() {
6676
.delete(&c.name, &Default::default())
6777
.await
6878
.expect("delete");
79+
80+
if c.purge {
81+
storage::delete_storage(client.clone(), &c.name, &opts.namespace)
82+
.await
83+
.expect("purge");
84+
}
6985
}
7086
}
7187
}

src/bin/main.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,14 @@ async fn reconcile(cc: Arc<CitusCluster>, context: Arc<ContextData>) -> Result<A
6868
match determine_action(&cc) {
6969
ClusterAction::Create => {
7070
cluster::add_finalizer(client.clone(), &name, &namespace).await?;
71-
cluster::deploy(client, &name, cc.spec.workers, &namespace).await?;
71+
cluster::deploy(
72+
client,
73+
&name,
74+
cc.spec.workers,
75+
cc.spec.worker_storage,
76+
&namespace,
77+
)
78+
.await?;
7279
Ok(Action::requeue(Duration::from_secs(10)))
7380
}
7481
ClusterAction::Delete => {

src/cluster.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ pub async fn deploy(
1212
client: Client,
1313
name: &str,
1414
num_workers: i32,
15+
worker_storage: usize,
1516
namespace: &str,
1617
) -> Result<CitusDeployment, Error> {
1718
let master = master::deploy(client.clone(), name, namespace).await?;
18-
let workers = workers::deploy(client.clone(), name, num_workers, namespace).await?;
19+
let workers =
20+
workers::deploy(client.clone(), name, num_workers, worker_storage, namespace).await?;
1921
jobs::register_workers(client.clone(), name, num_workers, namespace).await?;
2022

2123
master::expose(client.clone(), name, namespace).await?;

src/crd.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize};
44

55
#[derive(CustomResource, Serialize, Deserialize, Debug, PartialEq, Clone, JsonSchema)]
66
#[kube(
7-
group = "jw3.xyz",
8-
version = "v1alpha1",
9-
kind = "CitusCluster",
10-
plural = "citusclusters",
11-
derive = "PartialEq",
12-
namespaced
7+
group = "jw3.xyz",
8+
version = "v1alpha1",
9+
kind = "CitusCluster",
10+
plural = "citusclusters",
11+
derive = "PartialEq",
12+
namespaced
1313
)]
1414
pub struct CitusClusterSpec {
1515
pub workers: i32,
16+
pub worker_storage: usize,
1617
}

src/jobs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub async fn register_workers(
2222
spec: Some(PodSpec {
2323
restart_policy: Some("OnFailure".to_owned()),
2424
containers: vec![Container {
25-
name: format!("{name}-init-worker-"),
25+
name: format!("{name}-init-worker"),
2626
image: Some("citusdata/citus:12.1".to_owned()),
2727
command: Some(vec![
2828
"bash".to_owned(),

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ pub mod cluster;
22
pub mod crd;
33
pub mod jobs;
44
pub mod master;
5+
pub mod storage;
56
pub mod workers;

src/storage.rs

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use std::collections::BTreeMap;
2+
3+
use futures::TryFutureExt;
4+
use k8s_openapi::api::core::v1::{
5+
PersistentVolumeClaim, PersistentVolumeClaimSpec, ResourceRequirements,
6+
};
7+
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
8+
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
9+
use kube::{Api, Client, Error};
10+
11+
pub async fn delete_storage(client: Client, name: &str, namespace: &str) -> Result<(), Error> {
12+
let api: Api<PersistentVolumeClaim> = Api::namespaced(client.clone(), namespace);
13+
api.delete(name, &Default::default()).map_ok(|_| ()).await
14+
}
15+
16+
pub fn volume_claim_template(name: &str, gi: usize) -> PersistentVolumeClaim {
17+
let mut worker_labels: BTreeMap<String, Quantity> = BTreeMap::new();
18+
worker_labels.insert("storage".to_owned(), Quantity(format!("{gi}Gi")));
19+
20+
PersistentVolumeClaim {
21+
metadata: ObjectMeta {
22+
name: Some(name.to_owned()),
23+
..Default::default()
24+
},
25+
spec: Some(PersistentVolumeClaimSpec {
26+
access_modes: Some(vec!["ReadWriteOnce".to_owned()]),
27+
resources: Some(ResourceRequirements {
28+
requests: Some(worker_labels),
29+
..Default::default()
30+
}),
31+
..Default::default()
32+
}),
33+
..Default::default()
34+
}
35+
}

src/workers.rs

+10
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@ use std::collections::BTreeMap;
33
use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
44
use k8s_openapi::api::core::v1::{
55
Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
6+
VolumeMount,
67
};
78
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
89
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
910
use kube::{Api, Client, Error};
1011
use kube::api::{DeleteParams, PostParams};
1112

13+
use crate::storage;
14+
1215
pub async fn deploy(
1316
client: Client,
1417
name: &str,
1518
cnt: i32,
19+
storage: usize,
1620
namespace: &str,
1721
) -> Result<StatefulSet, Error> {
1822
let mut worker_labels: BTreeMap<String, String> = BTreeMap::new();
@@ -51,6 +55,11 @@ pub async fn deploy(
5155
value: Some("yourpassword".to_owned()),
5256
..EnvVar::default()
5357
}]),
58+
volume_mounts: Some(vec![VolumeMount {
59+
mount_path: "/var/lib/postgresql/data".to_owned(),
60+
name: name.to_owned(),
61+
..Default::default()
62+
}]),
5463
..Container::default()
5564
}],
5665
..PodSpec::default()
@@ -60,6 +69,7 @@ pub async fn deploy(
6069
..ObjectMeta::default()
6170
}),
6271
},
72+
volume_claim_templates: Some(vec![storage::volume_claim_template(name, storage)]),
6373
..StatefulSetSpec::default()
6474
}),
6575
..StatefulSet::default()

0 commit comments

Comments
 (0)