Skip to content

Classes

brickflow.engine.project.Project(name: str, debug_execute_workflow: Optional[str] = None, debug_execute_task: Optional[str] = None, git_repo: Optional[str] = None, provider: str = 'github', libraries: Optional[List[TaskLibrary]] = None, git_reference: Optional[str] = None, entry_point_path: Optional[str] = None, mode: Optional[str] = None, batch: bool = True, codegen_mechanism: Optional[Type[CodegenInterface]] = None, codegen_kwargs: Optional[Dict[str, Any]] = None, bundle_obj_name: Optional[str] = None, bundle_base_path: Optional[str] = None, enable_plugins: bool = False) dataclass

Attributes

batch: bool = True class-attribute instance-attribute

bundle_base_path: Optional[str] = None class-attribute instance-attribute

bundle_obj_name: Optional[str] = None class-attribute instance-attribute

codegen_kwargs: Optional[Dict[str, Any]] = None class-attribute instance-attribute

codegen_mechanism: Optional[Type[CodegenInterface]] = None class-attribute instance-attribute

debug_execute_task: Optional[str] = None class-attribute instance-attribute

debug_execute_workflow: Optional[str] = None class-attribute instance-attribute

enable_plugins: bool = False class-attribute instance-attribute

entry_point_path: Optional[str] = None class-attribute instance-attribute

git_reference: Optional[str] = None class-attribute instance-attribute

git_repo: Optional[str] = None class-attribute instance-attribute

libraries: Optional[List[TaskLibrary]] = None class-attribute instance-attribute

mode: Optional[str] = None class-attribute instance-attribute

name: str instance-attribute

provider: str = 'github' class-attribute instance-attribute

Functions

__enter__() -> _Project

Source code in brickflow/engine/project.py
def __enter__(self) -> "_Project":
    self._project = _Project(
        self.name,
        self.git_repo,
        self.provider,
        self.git_reference,
        self.entry_point_path,
        libraries=self.libraries,
        batch=self.batch,
        bundle_obj_name=self.bundle_obj_name,
        bundle_base_path=self.bundle_base_path,
    )
    return self._project

__exit__(exc_type, exc_val, exc_tb) -> None

Source code in brickflow/engine/project.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:  # type: ignore
    if exc_type is not None:
        error_types = {Stage.deploy: DeployError, Stage.execute: ExecuteError}
        raise error_types[self._mode](
            f"Oops... failed during: {self._mode}"
        ) from exc_val

    if len(self._project.workflows) == 0:
        _ilog.info("Doing nothing no workflows...")
        return

    if self._mode == Stage.unittest:
        # Mode is purely for testing purposes for the _Project internal class
        _ilog.info("Running unit tests...")
        return

    if self._mode.value == Stage.deploy.value:
        _ilog.info("Deploying changes... to %s", ctx.env)
        if self.codegen_mechanism is None:
            raise ValueError(
                "codegen_mechanism cannot be None; please raise a github issue for this."
            )
        codegen = self.codegen_mechanism(
            project=self._project,
            id_=f"{ctx.env}_{self.name}",
            env=ctx.env,
            **(self.codegen_kwargs or {}),
        )
        codegen.synth()

    if self._mode.value == Stage.execute.value:
        wf_id = ctx.get_parameter(
            BrickflowInternalVariables.workflow_id.value,
            self.debug_execute_workflow,
        )
        t_id = ctx.get_parameter(
            BrickflowInternalVariables.task_id.value, self.debug_execute_task
        )

        if wf_id is None or t_id is None:
            _ilog.info(
                "No workflow id or task key was able to found; doing nothing..."
            )
            return

        workflow = self._project.get_workflow(wf_id)
        task = workflow.get_task(t_id)
        task.execute()

__post_init__() -> None

Source code in brickflow/engine/project.py
def __post_init__(self) -> None:
    # during entry of project enable logging
    _ilog.setLevel(logging.INFO)

    self._mode = Stage[
        config(BrickflowEnvVars.BRICKFLOW_MODE.value, default=Stage.execute.value)
    ]
    self.entry_point_path = self.entry_point_path or get_caller_info()
    deploy_settings = BrickflowProjectDeploymentSettings()
    # setup current_project
    env_project_name = deploy_settings.brickflow_project_name

    self.libraries = self.libraries or []
    if deploy_settings.brickflow_auto_add_libraries is True:
        _ilog.info("Auto adding brickflow libraries...")
        self.libraries = filter_bf_related_libraries(self.libraries)
        # here libraries should not be null anymore if this branch is invoked
        self.libraries += get_brickflow_libraries()

    if (
        env_project_name is not None
        and self.name is not None
        and env_project_name != self.name
    ):
        raise ValueError(
            "Project name in config files and entrypoint must be the same"
        )

    ctx.set_current_project(self.name or env_project_name)  # always setup first

    # populate bundle info via env vars
    self.bundle_obj_name = config(
        BrickflowEnvVars.BRICKFLOW_BUNDLE_OBJ_NAME.value,
        default=".brickflow_bundles",
    )
    self.bundle_base_path = config(
        BrickflowEnvVars.BRICKFLOW_BUNDLE_BASE_PATH.value,
        default="/Users/${workspace.current_user.userName}",
    )

    self.git_reference = config(
        BrickflowEnvVars.BRICKFLOW_GIT_REF.value, default=self.get_git_ref()
    )

    if (
        self._mode == Stage.deploy
        and ctx.is_local() is False
        and self.git_reference is None
    ):
        raise ValueError(
            "git_reference must be set when deploying to non-local envs"
        )

    self.provider = config(
        BrickflowEnvVars.BRICKFLOW_GIT_PROVIDER.value, default=self.provider
    )
    self.git_repo = config(
        BrickflowEnvVars.BRICKFLOW_GIT_REPO.value, default=self.git_repo
    )

    deployment_mode = config(
        BrickflowEnvVars.BRICKFLOW_DEPLOYMENT_MODE.value, default="bundle"
    )
    if deployment_mode == BrickflowDeployMode.BUNDLE.value:
        self.codegen_mechanism = DatabricksBundleCodegen
    if self.codegen_kwargs is None:
        self.codegen_kwargs = {}

get_git_ref() -> Optional[str]

Source code in brickflow/engine/project.py
def get_git_ref(self) -> Optional[str]:
    if self._mode == Stage.deploy:
        if self.git_reference is not None:
            return self.git_reference
        else:
            try:
                return f"commit/{get_current_commit()}"
            except Exception:
                _ilog.warning(
                    "Unable to get current commit; defaulting to empty string"
                )
                return "commit/fake-local-stub" if ctx.is_local() else None
    else:
        return self.git_reference if self.git_reference is not None else ""