Skip to content

Commit

Permalink
feat: add dbt executable path to config (#49)
Browse files Browse the repository at this point in the history
Features:
- specify custom dbt executable path in the main config for all dbt
tasks (#46)

Fixes:
- make less strict dependencies constrains
(#45)
  • Loading branch information
NikitaYurasov authored Dec 23, 2024
1 parent 08d2472 commit 3a63cf1
Show file tree
Hide file tree
Showing 6 changed files with 1,799 additions and 1,664 deletions.
4 changes: 4 additions & 0 deletions dbt_af/conf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,12 @@ class Config:
:param dbt_project: all dbt related params
:param dbt_default_targets: default dbt targets for different operators
:param dbt_executable_path: path to dbt executable to run tasks
:param model_dependencies: section to parametrize how model dependencies are handled
:param include_single_model_manual_dag: whether to include single model manual dag; it will create airflow dag
without schedule, only with manual trigger and preset trigger form, where model name and date interval can be
specified
:param debug_mode_enabled: whether to run dbt commands with flag --debug
:param retries_config: config with retries policies for different DAG component types
:param max_active_dag_runs: max active dag runs for each airflow dag
:param af_dag_description: description for airflow dags
Expand All @@ -310,8 +312,10 @@ class Config:

# dbt-af specific params
dbt_default_targets: DbtDefaultTargetsConfig = attrs.field()
dbt_executable_path: str = attrs.field(default='dbt')
model_dependencies: ModelDependenciesSection = attrs.field(factory=ModelDependenciesSection)
include_single_model_manual_dag: bool = attrs.field(default=True)
debug_mode_enabled: bool = attrs.field(default=True)

# airflow-specific params
retries_config: RetriesConfig = attrs.field(factory=RetriesConfig)
Expand Down
11 changes: 5 additions & 6 deletions dbt_af/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def _patch_path_to_dbt_bash(self, **kwargs) -> str:
def generate_bash(self, **kwargs) -> str:
return (
self._patch_path_to_dbt_bash(**kwargs)
+ ' cd $PATH_TO_DBT && dbt {debug} {cli} '
+ ' cd $PATH_TO_DBT && {dbt_executable_path} {debug} {cli} '
'--profiles-dir $DBT_PROFILES_DIR '
'--project-dir $PATH_TO_DBT '
'--target {target_environment}'.format(**kwargs)
'--target {target_environment}'.format(dbt_executable_path=self.dbt_af_config.dbt_executable_path, **kwargs)
)

def __init__(
Expand All @@ -48,20 +48,19 @@ def __init__(
schedule_tag: BaseScheduleTag,
target_environment: str,
max_active_tis_per_dag: int = 1, # only one parallel tasks in the universe
debug_flg: bool = True,
pool: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
env: dict[str, str] | None = None,
**kwargs,
) -> None:
self.debug = '--debug' if debug_flg else ''
self.dbt_af_config = dbt_af_config

self.debug = '--debug' if self.dbt_af_config.debug_mode_enabled else ''
self.cli = self.cli_command

self.target_environment = target_environment or dbt_af_config.dbt_default_targets.default_target
assert self.target_environment, 'Target environment must be specified'

self.dbt_af_config = dbt_af_config

kwargs.update(get_delay_by_schedule(schedule_tag))
af_pool = pool or f'dbt_{self.target_environment}' if dbt_af_config.use_dbt_target_specific_pools else None

Expand Down
7 changes: 6 additions & 1 deletion dbt_af/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

import kubernetes.client.models as k8s
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

# this is a workaround to be compatible with old versions of Airflow and cncf-kubernetes provider
try:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
except (ModuleNotFoundError, ImportError):
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from dbt_af.common.constants import AZ_MI_BINDING_LABEL_NAME
from dbt_af.conf import Config
Expand Down
3 changes: 2 additions & 1 deletion dbt_af/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ def _check_freshness(self):
[
'cd $DBT_PROJECT_DIR',
'cp -R ./target/* $DBT_TARGET_PATH',
f'dbt source freshness {"-h" if self.dbt_af_config.is_dev else ""} '
f'{self.dbt_af_config.dbt_executable_path} source freshness '
f'{"-h" if self.dbt_af_config.is_dev else ""} '
f'--profiles-dir $DBT_PROFILES_DIR --project-dir $DBT_PROJECT_DIR --target {self.target_environment} '
f'--select source:{self.source_name}.{self.source_identifier}',
]
Expand Down
Loading

0 comments on commit 3a63cf1

Please sign in to comment.