Skip to main content
Version: 0.7.0 (alpha)

What are DAGs

Directed Acyclic Graphs (or DAGs for short) wire together different tasks and define the data flow between them. DAGs are reusable components defined as regular Python functions that can be instantiated one or more times in a single application (as long as they are uniuqely named) and can be reused across different applications.

This document describes how to create and use DAGs inside your seaplane application.

DAGs exist in two different states, definitions and instances. You can read more about the difference between the two states here.

DAGs follow the following naming scheme on Seaplane app_name.dag_name.nested_dag_name. Where nested_dag_name can be repeated for each additional nested DAG. For example, a DAG named foo inside an app named bar is identified by bar.foo

DAG definition

You define a DAG as a regular Python function. We recommend you provide at least the following three parameters, but additional parameters can be added at will.

  • app type:App - A seaplane app instance (required)
  • name type:string - Used to name your dag during instantiation. DAG names can only include letters [a-z A-Z], numbers [0-9] and -. Note _ are not allowed!
  • input type:list of type:Message - A list of input messages from different sources such as app input (app.input()) or task output (required)

The code snippet below shows a basic definition of a DAG wiring together four instances of the same task. You can learn more about task instances in our task documentation.

Basic Reusable DAG
from my_tasks import demo_task

def my_dag(app, name, input):
# create a new DAG on the app based on the provided name
dag = app.dag(name)

# add four consecutive tasks to the DAG.
output1 = dag.task(demo_task, [input], instance_name='demo-task-0')
output2 = dag.task(demo_task, [output1], instance_name='demo-task-1')
output3 = dag.task(demo_task, [output2], instance_name='demo-task-2')
output4 = dag.task(demo_task, [output3], instance_name='demo-task-3')

# set the output of the dag
dag.respond(output4)

# return the dag
return dag

We can visualize this DAG as follows:

Parameterizing DAGs

Because DAGs are standard Python functions, you can parameterize them and use standard Python inside them to construct the DAG. For example, you can achieve the same DAG as mentioned in the previous section with the following code snippet.

Basic Reusable DAG constructed with Python
from my_tasks import demo_task

def my_dag(app, name, input, count):
# create a new DAG on the app based on the provided name
dag = app.dag(name)

# wire input to the first instance
output = dag.task(demo_task, [input], instance_name='demo-task-0')

# add three consecutive tasks to the DAG.
for i in range(1, count):
output = dag.task(demo_task, [output], instance_name=f'demo-task-{i}')

# set the output of the dag
dag.respond(output)

# return the dag
return dag
danger

While you can use loops to construct DAGs, it is important to remember that the flow of the data can never contain a loop. For example, the following DAG is not allowed.

Data Flow

Tasks can retrieve input from one or more tasks, the application entry point, or another DAG. By wiring the tasks together in your DAG you create the flow of the data. For example, assume you have the following application.

Example task insantiation
from seaplane.apps import App

# create a task definition
def task_a(msg):
data = msg.body

# create a second task definition
def task_b(msg):
data = msg.body

# create a third task definition
def task_c(msg):
data = msg.body

# create a fourth task instance
def task_d(msg):
data = msg.body

# create app and DAG
app = App("hello-world-app")
dag = app.dag("hello-world-dag")

# instantiate task
output_a = dag.task(task_a, [app.input()], instance_name='task-a')
output_b = dag.task(task_b, [output_a], instance_name='task-b')
output_c = dag.task(task_c, [output_a], instance_name='task-c')
output_d = dag.task(task_d, [output_b, output_c], instance_name='task-d')

# set the response of the app
app.respond(output_d)

When deployed this constructs the following DAG.





DAG instantiation

You can turn any DAG definition into a live deployment by instantiating them inside the application. When deployed this creates all the associated resources such as tasks inside the DAG and deploys them on Seaplane.

To instantiate a DAG call the associated function and supply it with the required parameters. For example, assume you have the following DAG definition.

Basic DAG definition
from my_tasks import demo_task

def my_dag(app, name, input):
# create a new DAG on the app based on the provided name
dag = app.dag(name)

# add four consecutive tasks to the DAG.
output1 = dag.task(demo_task, [input], instance_name='demo-task-0')
output2 = dag.task(demo_task, [output1], instance_name='demo-task-1')

# set the output of the dag
dag.respond(output2)

# return the dag
return dag

To instantiate this DAG you call my_dag() inside your main.py and parameterize it with your App instance, the DAG name and the input to the DAG. In the example below we wire the HTTP input directly to the DAG.

DAG instantiation inside an application
from seaplane.apps import App

# create an app instance
app = App("example-app")

# instantiate the DAG, this creates instances of all tasks inside the DAG
output = my_dag(hello_world, 'my-dag', app.input())

# set the output of the app
app.respond(output)

# run the application
app.run()

When deployed Seaplane creates the application and all the associated resources in the DAG. In this case, it creates the following two tasks.

  • example-app.my-dag.demo-task-0
  • example-app.my-dag.demo-task-1

Nested DAGs

You can nest DAGs by instantiating one DAG inside another. While there is no theoretical maximum we tested nested DAGs up to 10 levels deep without any issues. Anything beyond 10 is at your own risk.

Prebuilt Seaplane DAGs

Seaplane comes with a set of prebuilt DAGs for common tasks such as collecting inputs from debatched input or multiple tasks and a DAG to communicate with Substation, our model hub.

You can learn more about them in the documentation linked below.