-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add plan command to the CLI #31
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
80f1694
Add plan command to the CLI
b-per b8c6570
Merge branch 'main' of github.com:dbt-labs/dbt-jobs-as-code into feat…
b-per 737ac78
Move the check of matching env vars outside client
b-per e6b9350
New function to check if a local config of env var exists in Cloud
b-per 63f8663
Create a ChangeSet Class
b-per 5db033b
Build the ChangeSet first and then act on it
b-per b9b0a89
Looks better with capwords
b-per File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
import string | ||
|
||
|
||
class Change(BaseModel): | ||
"""Describes what a given change is and hot to apply it.""" | ||
identifier: str | ||
type: str | ||
action: str | ||
sync_function: object | ||
parameters: dict | ||
|
||
def __str__(self): | ||
return f"{self.action.upper()} {string.capwords(self.type)} {self.identifier}" | ||
|
||
def apply(self): | ||
self.sync_function(**self.parameters) | ||
|
||
|
||
class ChangeSet(BaseModel): | ||
"""Store the set of changes to be displayed or applied.""" | ||
__root__: Optional[list[Change]] = [] | ||
|
||
def __iter__(self): | ||
return iter(self.__root__) | ||
|
||
def append(self, change: Change): | ||
self.__root__.append(change) | ||
|
||
def __str__(self): | ||
list_str = [str(change) for change in self.__root__] | ||
return "\n".join(list_str) | ||
|
||
def __len__(self): | ||
return len(self.__root__) | ||
|
||
def apply(self): | ||
for change in self.__root__: | ||
change.apply() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,17 +7,13 @@ | |
from client import DBTCloud | ||
from loader.load import load_job_configuration | ||
from schemas import check_job_mapping_same | ||
from changeset.change_set import Change, ChangeSet | ||
from schemas import check_env_var_same | ||
|
||
|
||
@click.group() | ||
def cli(): | ||
pass | ||
|
||
|
||
@cli.command() | ||
@click.argument("config", type=click.File("r")) | ||
def sync(config): | ||
"""Synchronize a dbt Cloud job config file against dbt Cloud. | ||
def build_change_set(config): | ||
"""Compares the config of YML files versus dbt Cloud. | ||
Depending on the value of no_update, it will either update the dbt Cloud config or not. | ||
|
||
CONFIG is the path to your jobs.yml config file. | ||
""" | ||
|
@@ -35,6 +31,8 @@ def sync(config): | |
job.identifier: job for job in cloud_jobs if job.identifier is not None | ||
} | ||
|
||
dbt_cloud_change_set = ChangeSet() | ||
|
||
# Use sets to find jobs for different operations | ||
shared_jobs = set(defined_jobs.keys()).intersection(set(tracked_jobs.keys())) | ||
created_jobs = set(defined_jobs.keys()) - set(tracked_jobs.keys()) | ||
|
@@ -47,18 +45,39 @@ def sync(config): | |
if not check_job_mapping_same( | ||
source_job=defined_jobs[identifier], dest_job=tracked_jobs[identifier] | ||
): | ||
dbt_cloud_change = Change( | ||
identifier=identifier, | ||
type="job", | ||
action="update", | ||
sync_function=dbt_cloud.update_job, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a really clever approach! I appreciate how configurable this callback function approach will be 🚀 |
||
parameters={"job": defined_jobs[identifier]}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
defined_jobs[identifier].id = tracked_jobs[identifier].id | ||
dbt_cloud.update_job(job=defined_jobs[identifier]) | ||
|
||
# Create new jobs | ||
logger.info("Detected {count} new jobs.", count=len(created_jobs)) | ||
for identifier in created_jobs: | ||
dbt_cloud.create_job(job=defined_jobs[identifier]) | ||
dbt_cloud_change = Change( | ||
identifier=identifier, | ||
type="job", | ||
action="create", | ||
sync_function=dbt_cloud.create_job, | ||
parameters={"job": defined_jobs[identifier]}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
|
||
# Remove Deleted Jobs | ||
logger.warning("Detected {count} deleted jobs.", count=len(deleted_jobs)) | ||
for identifier in deleted_jobs: | ||
dbt_cloud.delete_job(job=tracked_jobs[identifier]) | ||
dbt_cloud_change = Change( | ||
identifier=identifier, | ||
type="job", | ||
action="delete", | ||
sync_function=dbt_cloud.delete_job, | ||
parameters={"job": tracked_jobs[identifier]}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
|
||
# -- ENV VARS -- | ||
# Now that we have replicated all jobs we can get their IDs for further API calls | ||
|
@@ -67,37 +86,122 @@ def sync(config): | |
|
||
# Replicate the env vars from the YML to dbt Cloud | ||
for job in defined_jobs.values(): | ||
job_id = mapping_job_identifier_job_id[job.identifier] | ||
for env_var_yml in job.custom_environment_variables: | ||
env_var_yml.job_definition_id = job_id | ||
updated_env_vars = dbt_cloud.update_env_var( | ||
project_id=job.project_id, job_id=job_id, custom_env_var=env_var_yml | ||
|
||
if job.identifier in mapping_job_identifier_job_id: # the job already exists | ||
job_id = mapping_job_identifier_job_id[job.identifier] | ||
all_env_vars_for_job = dbt_cloud.get_env_vars( | ||
project_id=job.project_id, job_id=job_id | ||
) | ||
for env_var_yml in job.custom_environment_variables: | ||
env_var_yml.job_definition_id = job_id | ||
same_env_var, env_var_id = check_env_var_same( | ||
source_env_var=env_var_yml, dest_env_vars=all_env_vars_for_job | ||
) | ||
if not same_env_var: | ||
dbt_cloud_change = Change( | ||
identifier=f"{job.identifier}:{env_var_yml.name}", | ||
type="env var overwrite", | ||
action="update", | ||
sync_function=dbt_cloud.update_env_var, | ||
parameters={ | ||
"project_id": job.project_id, | ||
"job_id": job_id, | ||
"custom_env_var": env_var_yml, | ||
"env_var_id": env_var_id, | ||
}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
|
||
else: # the job doesn't exist yet so it doesn't have an ID | ||
for env_var_yml in job.custom_environment_variables: | ||
dbt_cloud_change = Change( | ||
identifier=f"{job.identifier}:{env_var_yml.name}", | ||
type="env var overwrite", | ||
action="create", | ||
sync_function=dbt_cloud.update_env_var, | ||
parameters={ | ||
"project_id": job.project_id, | ||
"job_id": None, | ||
"custom_env_var": env_var_yml, | ||
"env_var_id": None, | ||
"yml_job_identifier": job.identifier, | ||
}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
|
||
# Delete the env vars from dbt Cloud that are not in the yml | ||
for job in defined_jobs.values(): | ||
job_id = mapping_job_identifier_job_id[job.identifier] | ||
|
||
# We get the env vars from dbt Cloud, now that the YML ones have been replicated | ||
env_var_dbt_cloud = dbt_cloud.get_env_vars( | ||
project_id=job.project_id, job_id=job_id | ||
) | ||
# we only delete env var overwrite if the job already exists | ||
if job.identifier in mapping_job_identifier_job_id: | ||
job_id = mapping_job_identifier_job_id[job.identifier] | ||
|
||
# And we get the list of env vars defined for a given job in the YML | ||
env_vars_for_job = [ | ||
env_var.name for env_var in job.custom_environment_variables | ||
] | ||
# We get the env vars from dbt Cloud, now that the YML ones have been replicated | ||
env_var_dbt_cloud = dbt_cloud.get_env_vars( | ||
project_id=job.project_id, job_id=job_id | ||
) | ||
|
||
for env_var, env_var_val in env_var_dbt_cloud.items(): | ||
# If the env var is not in the YML but is defined at the "job" level in dbt Cloud, we delete it | ||
if env_var not in env_vars_for_job and env_var_val.id: | ||
logger.info(f"{env_var} not in the YML file but in the dbt Cloud job") | ||
dbt_cloud.delete_env_var( | ||
project_id=job.project_id, env_var_id=env_var_val.id | ||
) | ||
logger.info( | ||
f"Deleted the env_var {env_var} for the job {job.identifier}" | ||
) | ||
# And we get the list of env vars defined for a given job in the YML | ||
env_vars_for_job = [ | ||
env_var.name for env_var in job.custom_environment_variables | ||
] | ||
|
||
for env_var, env_var_val in env_var_dbt_cloud.items(): | ||
# If the env var is not in the YML but is defined at the "job" level in dbt Cloud, we delete it | ||
if env_var not in env_vars_for_job and env_var_val.id: | ||
logger.info( | ||
f"{env_var} not in the YML file but in the dbt Cloud job" | ||
) | ||
dbt_cloud_change = Change( | ||
identifier=f"{job.identifier}:{env_var_yml.name}", | ||
type="env var overwrite", | ||
action="delete", | ||
sync_function=dbt_cloud.delete_env_var, | ||
parameters={ | ||
"project_id": job.project_id, | ||
"env_var_id": env_var_val.id, | ||
}, | ||
) | ||
dbt_cloud_change_set.append(dbt_cloud_change) | ||
|
||
return dbt_cloud_change_set | ||
|
||
|
||
@click.group() | ||
def cli(): | ||
pass | ||
|
||
|
||
@cli.command() | ||
@click.argument("config", type=click.File("r")) | ||
def sync(config): | ||
"""Synchronize a dbt Cloud job config file against dbt Cloud. | ||
|
||
CONFIG is the path to your jobs.yml config file. | ||
""" | ||
change_set = build_change_set(config) | ||
if len(change_set) == 0: | ||
logger.success("-- PLAN -- No changes detected.") | ||
else: | ||
logger.warning("-- PLAN -- {count} changes detected.", count=len(change_set)) | ||
print(change_set) | ||
logger.info("-- SYNC --") | ||
change_set.apply() | ||
|
||
|
||
@cli.command() | ||
@click.argument("config", type=click.File("r")) | ||
def plan(config): | ||
"""Check the difference between a local file and dbt Cloud without updating dbt Cloud. | ||
|
||
CONFIG is the path to your jobs.yml config file. | ||
""" | ||
change_set = build_change_set(config) | ||
if len(change_set) == 0: | ||
logger.success("-- PLAN -- No changes detected.") | ||
else: | ||
logger.warning("-- PLAN -- {count} changes detected.", count=len(change_set)) | ||
print(change_set) | ||
|
||
|
||
@cli.command() | ||
|
@@ -121,11 +225,11 @@ def validate(config, online): | |
if not online: | ||
return | ||
|
||
# Retrive the list of Project IDs and Environment IDs from the config file | ||
# Retrieve the list of Project IDs and Environment IDs from the config file | ||
config_project_ids = set([job.project_id for job in defined_jobs]) | ||
config_environment_ids = set([job.environment_id for job in defined_jobs]) | ||
|
||
# Retrieve the list of Project IDs and Environment IDs from dbt Cloudby calling the environment API endpoint | ||
# Retrieve the list of Project IDs and Environment IDs from dbt Cloud by calling the environment API endpoint | ||
dbt_cloud = DBTCloud( | ||
account_id=list(defined_jobs)[0].account_id, | ||
api_key=os.environ.get("API_KEY"), | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's room in the future for this to be a little more detailed (e.g. what was the value before, and what is it now?). This is a great start 👍🏻