Skip to content

Attributes

Classes

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauRefreshABCOperator(server: str, username: str, password: str, site: str, project: str = None, parent_project: str = None, version: str = '3.14', max_async_workers: int = 5, polling_required: bool = True, polling_interval: int = 30, polling_timeout: int = 600, fail_operator: bool = True, *args, **kwargs)

Bases: BaseOperator

Abstract class that implements generic functionality for TableauRefresh operators.

Parameters

server : str Tableau server address, e.g. https://tableau-server.com username : str Log in username password : str Log in password site : str Tableau site project : str Tableau project parent_project : str Name of the parent Tableau project version : str Tableau server API version max_async_workers : int Maximum number of asynchronous tasks that will trigger jobs and wait for completion polling_required : bool Wait for job completion to proceed, otherwise just trigger the job and proceed polling_interval : int Polling interval for the job status updates (seconds) polling_timeout : int Stop polling if the job was not completed within the specified interval (seconds) fail_operator : bool Check Tableau refresh status and fail operator if any status is 'Failure' or 'Cancelled'

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(
    self,
    server: str,
    username: str,
    password: str,
    site: str,
    project: str = None,
    parent_project: str = None,
    version: str = "3.14",
    max_async_workers: int = 5,
    polling_required: bool = True,
    polling_interval: int = 30,
    polling_timeout: int = 600,
    fail_operator: bool = True,
    *args,
    **kwargs,
):
    """
    Abstract class that implements generic functionality for TableauRefresh operators.

    Parameters
    ----------
    server : str
        Tableau server address, e.g. https://tableau-server.com
    username : str
        Log in username
    password : str
        Log in password
    site : str
        Tableau site
    project : str
        Tableau project
    parent_project : str
        Name of the parent Tableau project
    version : str
        Tableau server API version
    max_async_workers : int
        Maximum number of asynchronous tasks that will trigger jobs and wait for completion
    polling_required : bool
        Wait for job completion to proceed, otherwise just trigger the job and proceed
    polling_interval : int
        Polling interval for the job status updates (seconds)
    polling_timeout : int
        Stop polling if the job was not completed within the specified interval (seconds)
    fail_operator : bool
        Check Tableau refresh status and fail operator if any status is 'Failure' or 'Cancelled'
    """
    self.__logger = log

    super().__init__(*args, **kwargs)

    self.wrapper_options = {
        "server": server,
        "username": username,
        "password": password,
        "site": site,
        "project": project,
        "parent_project": parent_project,
        "version": version,
        "max_async_workers": max_async_workers,
        "polling_required": polling_required,
        "polling_interval": polling_interval,
        "polling_timeout": polling_timeout,
    }

    self.__logger.info(f"Tableau wrapper options:{self.wrapper_options}")

    self.tableau_wrapper = None
    self.fail_operator = fail_operator

Attributes

fail_operator = fail_operator instance-attribute

tableau_wrapper = None instance-attribute

wrapper_options = {'server': server, 'username': username, 'password': password, 'site': site, 'project': project, 'parent_project': parent_project, 'version': version, 'max_async_workers': max_async_workers, 'polling_required': polling_required, 'polling_interval': polling_interval, 'polling_timeout': polling_timeout} instance-attribute

Functions

execute(context) abstractmethod

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
@abstractmethod
def execute(self, context):
    raise NotImplementedError

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauRefreshDataSourceOperator(data_sources: list, skip: bool = False, task_id: str = 'tableau_refresh_datasource', *args, **kwargs)

Bases: TableauRefreshABCOperator

Airflow operator that handles refresh of Tableau data sources.

Parameters

data_sources : list List of data source names that will be refreshed skip : bool Skip execution task_id : str ID for Airflow task

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(
    self,
    data_sources: list,
    skip: bool = False,
    task_id: str = "tableau_refresh_datasource",
    *args,
    **kwargs,
):
    """
    Airflow operator that handles refresh of Tableau data sources.

    Parameters
    ----------
    data_sources : list
        List of data source names that will be refreshed
    skip : bool
        Skip execution
    task_id : str
        ID for Airflow task
    """
    super().__init__(task_id=task_id, *args, **kwargs)
    self.data_sources = data_sources
    self.__skip = skip

Attributes

data_sources = data_sources instance-attribute

Functions

execute(context)

Refresh data source in Tableau.

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def execute(self, context):
    """
    Refresh data source in Tableau.
    """
    if not self.__skip:
        self.tableau_wrapper = TableauWrapper(**self.wrapper_options)
        results = self.tableau_wrapper.refresh_datasources(
            data_sources=self.data_sources
        )
        self._analyze_refresh_result(results)
    else:
        self.__logger.info("Skipping task execution...")

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauRefreshEmptyException

Bases: Exception

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauRefreshException

Bases: Exception

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauRefreshWorkBookOperator(workbooks: list, skip: bool = False, task_id: str = 'tableau_refresh_workbooks', *args, **kwargs)

Bases: TableauRefreshABCOperator

Airflow operator that handles refresh of Tableau workbooks.

Parameters

workbooks : list List of workbook names that will be refreshed skip : bool Skip execution task_id : str ID for Airflow task

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(
    self,
    workbooks: list,
    skip: bool = False,
    task_id: str = "tableau_refresh_workbooks",
    *args,
    **kwargs,
):
    """
    Airflow operator that handles refresh of Tableau workbooks.

    Parameters
    ----------
    workbooks : list
        List of workbook names that will be refreshed
    skip : bool
        Skip execution
    task_id : str
        ID for Airflow task
    """
    super().__init__(task_id=task_id, *args, **kwargs)
    self.workbooks = workbooks
    self.__skip = skip

Attributes

workbooks = workbooks instance-attribute

Functions

execute(context)

Refresh workbooks in Tableau

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def execute(self, context):
    """
    Refresh workbooks in Tableau
    """
    if not self.__skip:
        self.tableau_wrapper = TableauWrapper(**self.wrapper_options)
        results = self.tableau_wrapper.refresh_workbooks(work_books=self.workbooks)
        self._analyze_refresh_result(results)
    else:
        self.__logger.info("Skipping task execution...")

brickflow_plugins.airflow.operators.external_tasks_tableau.TableauWrapper(server: str, username: str, password: str, site: str, project: str = None, parent_project: str = None, version: str = '3.14', max_async_workers: int = 5, polling_required: bool = True, polling_interval: int = 30, polling_timeout: int = 600)

Class facilitates interaction with Tableau server for the purpose of refreshing data sources or work books. Refresh is triggered asynchronously, and Tableau server is polled for results for each job, until the job is finished or timeout is reached (default: 10 minutes).

Initialize TableauWrapper object with the specified parameters.

Parameters

server : str Tableau server address, e.g. https://tableau-server.com username : str Log in username password : str Log in password site : str Tableau site project : str Tableau project parent_project : str Name of the parent Tableau project version : str Tableau server API version max_async_workers : int Maximum number of asynchronous tasks that will trigger jobs and wait for completion polling_required : bool Wait for job completion to proceed, otherwise just trigger the job and proceed polling_interval : int Polling interval for the job status updates (seconds) polling_timeout : int Stop polling if the job was not completed within the specified interval (seconds)

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(
    self,
    server: str,
    username: str,
    password: str,
    site: str,
    project: str = None,
    parent_project: str = None,
    version: str = "3.14",
    max_async_workers: int = 5,
    polling_required: bool = True,
    polling_interval: int = 30,
    polling_timeout: int = 600,
):
    """
    Initialize TableauWrapper object with the specified parameters.

    Parameters
    ----------
    server : str
        Tableau server address, e.g. https://tableau-server.com
    username : str
        Log in username
    password : str
        Log in password
    site : str
        Tableau site
    project : str
        Tableau project
    parent_project : str
        Name of the parent Tableau project
    version : str
        Tableau server API version
    max_async_workers : int
        Maximum number of asynchronous tasks that will trigger jobs and wait for completion
    polling_required : bool
        Wait for job completion to proceed, otherwise just trigger the job and proceed
    polling_interval : int
        Polling interval for the job status updates (seconds)
    polling_timeout : int
        Stop polling if the job was not completed within the specified interval (seconds)
    """
    self.server = server
    self.version = version
    self.username = username
    self.password = password
    self.site = site
    self.project = project
    self.parent_project = parent_project

    self.max_async_workers = max_async_workers
    self.polling_required = polling_required
    self.polling_interval = polling_interval
    self.polling_timeout = polling_timeout

    self._logger = log
    self._ip = None

Attributes

max_async_workers = max_async_workers instance-attribute

parent_project = parent_project instance-attribute

password = password instance-attribute

polling_interval = polling_interval instance-attribute

polling_required = polling_required instance-attribute

polling_timeout = polling_timeout instance-attribute

project = project instance-attribute

server = server instance-attribute

site = site instance-attribute

username = username instance-attribute

version = version instance-attribute

Classes

MultipleWorkingProjectsException()

Bases: Exception

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(self):
    self.message = "Multiple projects with the same name exist on the server! Set 'parent_project' parameter!"
    super().__init__(self.message)
Attributes
message = "Multiple projects with the same name exist on the server! Set 'parent_project' parameter!" instance-attribute

UnidentifiedWorkingProjectException()

Bases: Exception

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def __init__(self):
    self.message = "Could not identify working project, check that the spelling is correct!"
    super().__init__(self.message)
Attributes
message = 'Could not identify working project, check that the spelling is correct!' instance-attribute

Functions

refresh_datasources(data_sources: list) -> list

Asynchronously refresh specified list of Tableau data sources.

Parameters

data_sources : list List of data sources

Returns

list List of dictionaries with data sources and their respective refresh statuses

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def refresh_datasources(self, data_sources: list) -> list:
    """
    Asynchronously refresh specified list of Tableau data sources.

    Parameters
    ----------
    data_sources : list
        List of data sources

    Returns
    -------
    list
        List of dictionaries with data sources and their respective refresh statuses
    """
    with self._authenticate():
        # Only refresh selected data sources
        lim_ds = self._filter_datasources(data_sources=data_sources)

        # Start async execution and collect results
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_async_workers
        ) as executor:
            executor_results = executor.map(self._refresh_datasource, lim_ds)

            results = [result[1] for result in enumerate(executor_results)]

        return results

refresh_workbooks(work_books: list) -> list

Asynchronously refresh specified list of Tableau workbooks.

Parameters

work_books : list List of work books

Returns

list List of dictionaries with work books and their respective refresh statuses

Source code in brickflow_plugins/airflow/operators/external_tasks_tableau.py
def refresh_workbooks(self, work_books: list) -> list:
    """
    Asynchronously refresh specified list of Tableau workbooks.

    Parameters
    ----------
    work_books : list
        List of work books

    Returns
    -------
    list
        List of dictionaries with work books and their respective refresh statuses
    """
    with self._authenticate():
        # Only refresh selected workbooks
        lim_wb = self._filter_workbooks(work_books=work_books)

        # Start async execution and collect results
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_async_workers
        ) as executor:
            executor_results = executor.map(self._refresh_workbook, lim_wb)

            results = [result[1] for result in enumerate(executor_results)]

        return results