Skip to content

Commit

Permalink
Detect when a deployment finishes (hack)
Browse files Browse the repository at this point in the history
  • Loading branch information
film42 committed Mar 2, 2022
1 parent 155f642 commit e47e53e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 127 deletions.
2 changes: 1 addition & 1 deletion deployment.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ metadata:
app: nginx
apps.mx.com/deploymenthook: finished
spec:
replicas: 4
replicas: 2
selector:
matchLabels:
app: nginx
Expand Down
186 changes: 60 additions & 126 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,6 @@ mod cache;
mod crd;
mod job;

/// Context injected with each `reconcile` and `on_error` method invocation.
struct ContextData {
/// Kubernetes client to make Kubernetes API requests with. Required for K8S resource management.
client: Client,
}

impl ContextData {
/// Constructs a new instance of ContextData.
///
/// # Arguments:
/// - `client`: A Kubernetes client to make Kubernetes REST API requests with. Resources
/// will be created and deleted with this client.
pub fn new(client: Client) -> Self {
ContextData { client }
}
}

async fn create_job_for_deployment_hook(
client: Client,
hook: &DeploymentHook,
Expand Down Expand Up @@ -72,25 +55,54 @@ async fn watch_for_new_deployments(
.resource_version
.expect("invalid call");

println!(
"Current Deployment API ResrouceVersion: {}, Subscribing...",
&resource_version
);

let mut stream = deployment_api
.watch(&params, &resource_version)
.await?
.boxed();

println!("Current Deployment API ResrouceVersion: {resource_version}",);
drop(stream);

let deployments = deployment_api.list(&params).await?.items;

for deployment in deployments.iter() {
println!("DEPLOYMENT: {:?}", deployment.metadata.labels);

for hook in cache.find_by_matching_deployment(deployment).iter() {
create_job_for_deployment_hook(client.clone(), hook).await?;
while let Some(event) = stream.try_next().await? {
match event {
WatchEvent::Added(deployment) | WatchEvent::Modified(deployment) => {
// Check to see if the deployment has finished
if let (Some(status), Some(spec)) =
(deployment.status.as_ref(), deployment.spec.as_ref())
{
if let (Some(ready_replicas), Some(replicas), Some(deployment_replicas)) =
(status.ready_replicas, status.replicas, spec.replicas)
{
// println!("DEPLOYMENT STATUS: {:?}", deployment);
// I think we just need to make sure these two values match in order for
// this to be consider a completed deployment.
if ready_replicas == replicas && replicas == deployment_replicas {
for hook in cache.find_by_matching_deployment(&deployment).iter() {
create_job_for_deployment_hook(client.clone(), hook).await?;
}
}
}
}
}
_ => { /* ignore */ }
}
}

Ok(())

// let deployments = deployment_api.list(&params).await?.items;
//
// for deployment in deployments.iter() {
// println!("DEPLOYMENT: {:?}", deployment.metadata.labels);
//
// for hook in cache.find_by_matching_deployment(deployment).iter() {
// create_job_for_deployment_hook(client.clone(), hook).await?;
// }
// }
//
// Ok(())
}
// async fn referesh_deployment_hook_cache(hook_cache: Arc<Mutex<DeploymentHookCache>>) {}

Expand All @@ -100,108 +112,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
.expect("Expected a valid KUBECONFIG environment variable.");

let context: Context<ContextData> = Context::new(ContextData::new(client.clone()));

// Prime the deployhook cache
let cache = cache::DeploymentHookCache::default();
cache.refresh(client.clone()).await?;

println!("HOOKS: {:?}", cache);
// Refresh the cache every minute
tokio::spawn({
let cache = cache.clone();
let client = client.clone();

watch_for_new_deployments(client.clone(), cache.clone()).await?;
async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;

// // TODO: Is there a way to get a list of all deployment hooks across all namespaces, begin
// // the reconiliation of those, and then start consuming new bits? We will need to create a
// // watch on all the deployments we care about.
//
// // The controller comes from the `kube_runtime` crate and manages the reconciliation process.
// Controller::new(crd_api.clone(), ListParams::default())
// .run(reconcile, on_error, context)
// .for_each(|reconciliation_result| async move {
// match reconciliation_result {
// Ok(echo_resource) => {
// println!("Reconciliation successful. Resource: {:?}", echo_resource);
// }
// Err(reconciliation_err) => {
// eprintln!("Reconciliation error: {:?}", reconciliation_err)
// }
// }
// })
// .await;
//
Ok(())
}
if let Err(err) = cache.refresh(client.clone()).await {
println!("Failed to refresh the deployment hooks cache: {:?}", err);
}
}
}
});

// async fn reconcile(
// deployment_hook: DeploymentHook,
// context: Context<ContextData>,
// ) -> Result<ReconcilerAction, Error> {
// let client: Client = context.get_ref().client.clone();
//
// // Get the deployment hook namesace.
// let namespace: String = match deployment_hook.namespace() {
// None => {
// // If there is no namespace to deploy to defined, reconciliation ends with an error immediately.
// return Err(Error::UserInputError(
// "Expected DeploymentHook resource to be namespaced. Can't deploy to an unknown namespace."
// .to_owned(),
// ));
// }
// // If namespace is known, proceed. In a more advanced version of the operator, perhaps
// // the namespace could be checked for existence first.
// Some(namespace) => namespace,
// };
//
// // TODO: Get the deployment object.
//
// // Determine if a deployment was successful, if so, create a job patterned after the pod spec
// // as defined according to the deployment.
//
// // Performs action as decided by the `determine_action` function.
// return match determine_action(client.clone(), &deployment_hook).await? {
// Action::Create => {
// // Creates a deployment with `n` Echo service pods, but applies a finalizer first.
// // Finalizer is applied first, as the operator might be shut down and restarted
// // at any time, leaving subresources in intermediate state. This prevents leaks on
// // the `Echo` resource deletion.
// let name = echo.name(); // Name of the Echo resource is used to name the subresources as well.
//
// // Apply the finalizer first. If that fails, the `?` operator invokes automatic conversion
// // of `kube::Error` to the `Error` defined in this crate.
// finalizer::add(client.clone(), &name, &namespace).await?;
// // Invoke creation of a Kubernetes built-in resource named deployment with `n` echo service pods.
// echo::deploy(client, &echo.name(), echo.spec.replicas, &namespace).await?;
// Ok(ReconcilerAction {
// // Finalizer is added, deployment is deployed, re-check in 10 seconds.
// requeue_after: Some(Duration::from_secs(10)),
// })
// }
// Action::Delete => {
// // Deletes any subresources related to this `Echo` resources. If and only if all subresources
// // are deleted, the finalizer is removed and Kubernetes is free to remove the `Echo` resource.
//
// //First, delete the deployment. If there is any error deleting the deployment, it is
// // automatically converted into `Error` defined in this crate and the reconciliation is ended
// // with that error.
// // Note: A more advanced implementation would for the Deployment's existence.
// echo::delete(client.clone(), &echo.name(), &namespace).await?;
//
// // Once the deployment is successfully removed, remove the finalizer to make it possible
// // for Kubernetes to delete the `Echo` resource.
// finalizer::delete(client, &echo.name(), &namespace).await?;
// Ok(ReconcilerAction {
// requeue_after: None, // Makes no sense to delete after a successful delete, as the resource is gone
// })
// }
// Action::NoOp => Ok(ReconcilerAction {
// // The resource is already in desired state, do nothing and re-check after 10 seconds
// requeue_after: Some(Duration::from_secs(10)),
// }),
// };
// }
//
// fn on_error(error: &Error, _context: Context<ContextData>) -> ReconcilerAction {
// eprintln!("Reconciliation error:\n{:?}", error);
// ReconcilerAction {
// requeue_after: Some(Duration::from_secs(5)),
// }
// }
// Watch for deployment changes
loop {
watch_for_new_deployments(client.clone(), cache.clone()).await?;

println!("Deployment watcher finished or expired, restarting...");
}
}

0 comments on commit e47e53e

Please sign in to comment.