Airflow (dagster-airflow)

This library provides a Dagster integration with Airflow.

For more information on getting started, see the Airflow integration guide.

Run Airflow on Dagster

dagster_airflow.make_dagster_job_from_airflow_dag(dag, tags=None, use_airflow_template_context=False, unique_id=None, mock_xcom=False, use_ephemeral_airflow_db=False)[source]

Construct a Dagster job corresponding to a given Airflow DAG.

Tasks in the resulting job will execute the execute() method on the corresponding Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module containing your DAG definition must be available in the Python environment within which your Dagster solids execute.

To set Airflow’s execution_date for use with Airflow Operator’s execute() methods, either:

  1. (Best for ad hoc runs) Execute job directly. This will set execution_date to the

    time (in UTC) of the run.

  2. Add {'airflow_execution_date': utc_date_string} to the job tags. This will override

    behavior from (1).

    my_dagster_job = make_dagster_job_from_airflow_dag(
            dag=dag,
            tags={'airflow_execution_date': utc_execution_date_str}
    )
    my_dagster_job.execute_in_process()
    
  3. (Recommended) Add {'airflow_execution_date': utc_date_string} to the run tags,

    such as in the Dagit UI. This will override behavior from (1) and (2)

We apply normalized_name() to the dag id and task ids when generating job name and op names to ensure that names conform to Dagster’s naming conventions.

Parameters:
  • dag (DAG) – The Airflow DAG to compile into a Dagster job

  • tags (Dict[str, Field]) – Job tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context setting is ignored if use_ephemeral_airflow_db is True. (default: False)

  • unique_id (int) – If not None, this id will be postpended to generated op names. Used by framework authors to enforce unique op names within a repo.

  • mock_xcom (bool) – If True, dagster will mock out all calls made to xcom, features that depend on xcom may not work as expected. (default: False)

  • use_ephemeral_airflow_db (bool) – If True, dagster will create an ephemeral sqlite airflow database for each run. (default: False)

Returns:

The generated Dagster job

Return type:

JobDefinition

dagster_airflow.make_dagster_repo_from_airflow_dags_path(dag_path, repo_name, safe_mode=True, store_serialized_dags=False, use_airflow_template_context=False, mock_xcom=False, use_ephemeral_airflow_db=True)[source]

Construct a Dagster repository corresponding to Airflow DAGs in dag_path.

DagBag.get_dag() dependency requires Airflow DB to be initialized.

Usage:

Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dags_path

def make_repo_from_dir():
    return make_dagster_repo_from_airflow_dags_path(
        '/path/to/dags/', 'my_repo_name'
    )

Use RepositoryDefinition as usual, for example: dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir

Parameters:
  • dag_path (str) – Path to directory or file that contains Airflow Dags

  • repo_name (str) – Name for generated RepositoryDefinition

  • include_examples (bool) – True to include Airflow’s example DAGs. (default: False)

  • safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)

  • store_serialized_dags (bool) – True to read Airflow DAGS from Airflow DB. False to read DAGS from Python files. (default: False)

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context setting is ignored if use_ephemeral_airflow_db is True. (default: False)

  • mock_xcom (bool) – If True, dagster will mock out all calls made to xcom, features that depend on xcom may not work as expected. (default: False)

  • use_ephemeral_airflow_db (bool) – If True, dagster will create an ephemeral sqlite airflow database for each run. (default: False)

Returns:

RepositoryDefinition

dagster_airflow.make_dagster_repo_from_airflow_dag_bag(dag_bag, repo_name, refresh_from_airflow_db=False, use_airflow_template_context=False, mock_xcom=False, use_ephemeral_airflow_db=False)[source]

Construct a Dagster repository corresponding to Airflow DAGs in DagBag.

Usage:
Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dag_bag from airflow_home import my_dag_bag

def make_repo_from_dag_bag():

return make_dagster_repo_from_airflow_dag_bag(my_dag_bag, ‘my_repo_name’)

Use RepositoryDefinition as usual, for example:

dagit -f path/to/make_dagster_repo.py -n make_repo_from_dag_bag

Parameters:
  • dag_path (str) – Path to directory or file that contains Airflow Dags

  • repo_name (str) – Name for generated RepositoryDefinition

  • refresh_from_airflow_db (bool) – If True, will refresh DAG if expired via DagBag.get_dag(), which requires access to initialized Airflow DB. If False (recommended), gets dag from DagBag’s dags dict without depending on Airflow DB. (default: False)

  • use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context setting is ignored if use_ephemeral_airflow_db is True. (default: False)

  • mock_xcom (bool) – If True, dagster will mock out all calls made to xcom, features that depend on xcom may not work as expected. (default: False)

  • use_ephemeral_airflow_db (bool) – If True, dagster will create an ephemeral sqlite airflow database for each run. (default: False)

Returns:

RepositoryDefinition

dagster_airflow.make_dagster_repo_from_airflow_example_dags(repo_name='airflow_example_dags_repo', use_ephemeral_airflow_db=True)[source]

Construct a Dagster repository for Airflow’s example DAGs.

Execution of the following Airflow example DAGs is not currently supported:

‘example_external_task_marker_child’, ‘example_pig_operator’, ‘example_skip_dag’, ‘example_trigger_target_dag’, ‘example_xcom’, ‘test_utils’,

Usage:

Create make_dagster_repo.py:

from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_example_dags

def make_airflow_example_dags():

return make_dagster_repo_from_airflow_example_dags()

Use RepositoryDefinition as usual, for example:

dagit -f path/to/make_dagster_repo.py -n make_airflow_example_dags

Parameters:
  • repo_name (str) – Name for generated RepositoryDefinition

  • use_ephemeral_airflow_db (bool) – If True, dagster will create an ephemeral sqlite airflow database for each run. (default: False)

Returns:

RepositoryDefinition

dagster_airflow.airflow_operator_to_op(airflow_op, connections=None, capture_python_logs=True, return_output=False)[source]

Construct a Dagster op corresponding to a given Airflow operator.

The resulting op will execute the execute() method on the Airflow operator. Dagster and any dependencies required by the Airflow Operator must be available in the Python environment within which your Dagster ops execute.

To specify Airflow connections utilized by the operator, instantiate and pass Airflow connection objects in a list to the connections parameter of this function.

http_task = SimpleHttpOperator(task_id="my_http_task", endpoint="foo")
connections = [Connection(conn_id="http_default", host="https://mycoolwebsite.com")]
dagster_op = airflow_operator_to_op(http_task, connections=connections)

In order to specify extra parameters to the connection, call the set_extra() method on the instantiated Airflow connection:

s3_conn = Connection(conn_id=f's3_conn', conn_type="s3")
s3_conn_extra = {
    "aws_access_key_id": "my_access_key",
    "aws_secret_access_key": "my_secret_access_key",
}
s3_conn.set_extra(json.dumps(s3_conn_extra))
Parameters:
  • airflow_op (BaseOperator) – The Airflow operator to convert into a Dagster op

  • connections (Optional[List[Connection]]) – Airflow connections utilized by the operator.

  • capture_python_logs (bool) – If False, will not redirect Airflow logs to compute logs. (default: True)

  • return_output (bool) – If True, will return any output from the Airflow operator. (default: False)

Returns:

The generated Dagster op

Return type:

converted_op (OpDefinition)

Run Dagster on Airflow

dagster_airflow.make_airflow_dag(module_name, job_name, run_config=None, mode=None, instance=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct an Airflow DAG corresponding to a given Dagster job/pipeline.

Tasks in the resulting DAG will execute the Dagster logic they encapsulate as a Python callable, run by an underlying PythonOperator. As a consequence, both dagster, any Python dependencies required by your solid logic, and the module containing your pipeline definition must be available in the Python environment within which your Airflow tasks execute. If you cannot install requirements into this environment, or you are looking for a containerized solution to provide better isolation, see instead make_airflow_dag_containerized().

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters:
  • module_name (str) – The name of the importable module in which the pipeline/job definition can be found.

  • job_name (str) – The name of the job definition.

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline/job.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator (a subclass of PythonOperator).

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns:

The generated Airflow DAG, and a list of its constituent tasks.

Return type:

(airflow.models.DAG, List[airflow.models.BaseOperator])

dagster_airflow.make_airflow_dag_for_operator(recon_repo, job_name, operator, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct an Airflow DAG corresponding to a given Dagster job/pipeline and custom operator.

Custom operator template

Tasks in the resulting DAG will execute the Dagster logic they encapsulate run by the given Operator BaseOperator. If you are looking for a containerized solution to provide better isolation, see instead make_airflow_dag_containerized().

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters:
  • recon_repo (dagster.ReconstructableRepository) – reference to a Dagster RepositoryDefinition that can be reconstructed in another process

  • job_name (str) – The name of the job definition.

  • operator (type) – The operator to use. Must be a class that inherits from BaseOperator

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator.

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns:

The generated Airflow DAG, and a list of its constituent tasks.

Return type:

(airflow.models.DAG, List[airflow.models.BaseOperator])

dagster_airflow.make_airflow_dag_containerized(module_name, job_name, image, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None, pipeline_name=None)[source]

Construct a containerized Airflow DAG corresponding to a given Dagster job/pipeline.

Tasks in the resulting DAG will execute the Dagster logic they encapsulate using a subclass of DockerOperator. As a consequence, both dagster, any Python dependencies required by your solid logic, and the module containing your pipeline definition must be available in the container spun up by this operator. Typically you’ll want to install these requirements onto the image you’re using.

This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.

Parameters:
  • module_name (str) – The name of the importable module in which the pipeline/job definition can be found.

  • job_name (str) – The name of the job definition.

  • image (str) – The name of the Docker image to use for execution (passed through to DockerOperator).

  • run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.

  • mode (Optional[str]) – The mode in which to execute the pipeline.

  • dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to DAG).

  • dag_description (Optional[str]) – The description to use for the compiled Airflow DAG (passed through to DAG)

  • dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow DAG constructor, including default_args.

  • op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator (a subclass of DockerOperator).

  • pipeline_name (str) – (legacy) The name of the pipeline definition.

Returns:

The generated Airflow DAG, and a list of its constituent tasks.

Return type:

(airflow.models.DAG, List[airflow.models.BaseOperator])

Orchestrate Dagster from Airflow

class dagster_airflow.DagsterCloudOperator(*args, **kwargs)[source]

Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud

Parameters:
  • repository_name (str) – the name of the repository to use

  • repostitory_location_name (str) – the name of the repostitory location to use

  • job_name (str) – the name of the job to run

  • run_config (Optional[Dict[str, Any]]) – the run config to use for the job run

  • dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only

  • organization_id (Optional[str]) – the id of the dagster cloud organization

  • deployment_name (Optional[str]) – the name of the dagster cloud deployment

  • user_token (Optional[str]) – the dagster cloud user token to use

class dagster_airflow.DagsterOperator(*args, **kwargs)[source]

Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure

Parameters:
  • repository_name (str) – the name of the repository to use

  • repostitory_location_name (str) – the name of the repostitory location to use

  • job_name (str) – the name of the job to run

  • run_config (Optional[Dict[str, Any]]) – the run config to use for the job run

  • dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only

  • organization_id (Optional[str]) – the id of the dagster cloud organization

  • deployment_name (Optional[str]) – the name of the dagster cloud deployment

  • user_token (Optional[str]) – the dagster cloud user token to use