What are DAGs
Directed Acyclic Graphs (or DAGs for short) wire together different tasks and define the data flow between them. DAGs are reusable components defined as regular Python functions that can be instantiated one or more times in a single application (as long as they are uniuqely named) and can be reused across different applications.
This document describes how to create and use DAGs inside your seaplane application.
DAGs exist in two different states, definitions and instances. You can read more about the difference between the two states here.
DAGs follow the following naming scheme on Seaplane
app_name.dag_name.nested_dag_name
. Where nested_dag_name
can be repeated for
each additional nested DAG. For example, a DAG named foo
inside an app named
bar
is identified by bar.foo
DAG definition​
You define a DAG as a regular Python function. We recommend you provide at least the following three parameters, but additional parameters can be added at will.
- app
type:App
- A seaplane app instance (required) - name
type:string
- Used to name your dag during instantiation. DAG names can only include letters[a-z A-Z]
, numbers[0-9]
and-
. Note_
are not allowed! - input
type:list
oftype:Message
- A list of input messages from different sources such as app input (app.input()
) or task output (required)
The code snippet below shows a basic definition of a DAG wiring together four instances of the same task. You can learn more about task instances in our task documentation.
from my_tasks import demo_task
def my_dag(app, name, input):
# create a new DAG on the app based on the provided name
dag = app.dag(name)
# add four consecutive tasks to the DAG.
output1 = dag.task(demo_task, [input], instance_name='demo-task-0')
output2 = dag.task(demo_task, [output1], instance_name='demo-task-1')
output3 = dag.task(demo_task, [output2], instance_name='demo-task-2')
output4 = dag.task(demo_task, [output3], instance_name='demo-task-3')
# set the output of the dag
dag.respond(output4)
# return the dag
return dag
We can visualize this DAG as follows:
Parameterizing DAGs​
Because DAGs are standard Python functions, you can parameterize them and use standard Python inside them to construct the DAG. For example, you can achieve the same DAG as mentioned in the previous section with the following code snippet.
from my_tasks import demo_task
def my_dag(app, name, input, count):
# create a new DAG on the app based on the provided name
dag = app.dag(name)
# wire input to the first instance
output = dag.task(demo_task, [input], instance_name='demo-task-0')
# add three consecutive tasks to the DAG.
for i in range(1, count):
output = dag.task(demo_task, [output], instance_name=f'demo-task-{i}')
# set the output of the dag
dag.respond(output)
# return the dag
return dag
While you can use loops to construct DAGs, it is important to remember that the flow of the data can never contain a loop. For example, the following DAG is not allowed.
Data Flow​
Tasks can retrieve input from one or more tasks, the application entry point, or another DAG. By wiring the tasks together in your DAG you create the flow of the data. For example, assume you have the following application.
from seaplane.apps import App
# create a task definition
def task_a(msg):
data = msg.body
# create a second task definition
def task_b(msg):
data = msg.body
# create a third task definition
def task_c(msg):
data = msg.body
# create a fourth task instance
def task_d(msg):
data = msg.body
# create app and DAG
app = App("hello-world-app")
dag = app.dag("hello-world-dag")
# instantiate task
output_a = dag.task(task_a, [app.input()], instance_name='task-a')
output_b = dag.task(task_b, [output_a], instance_name='task-b')
output_c = dag.task(task_c, [output_a], instance_name='task-c')
output_d = dag.task(task_d, [output_b, output_c], instance_name='task-d')
# set the response of the app
app.respond(output_d)
When deployed this constructs the following DAG.
DAG instantiation​
You can turn any DAG definition into a live deployment by instantiating them inside the application. When deployed this creates all the associated resources such as tasks inside the DAG and deploys them on Seaplane.
To instantiate a DAG call the associated function and supply it with the required parameters. For example, assume you have the following DAG definition.
from my_tasks import demo_task
def my_dag(app, name, input):
# create a new DAG on the app based on the provided name
dag = app.dag(name)
# add four consecutive tasks to the DAG.
output1 = dag.task(demo_task, [input], instance_name='demo-task-0')
output2 = dag.task(demo_task, [output1], instance_name='demo-task-1')
# set the output of the dag
dag.respond(output2)
# return the dag
return dag
To instantiate this DAG you call my_dag()
inside your main.py
and
parameterize it with your App
instance, the DAG name and the input to the DAG.
In the example below we wire the HTTP input directly to the DAG.
from seaplane.apps import App
# create an app instance
app = App("example-app")
# instantiate the DAG, this creates instances of all tasks inside the DAG
output = my_dag(hello_world, 'my-dag', app.input())
# set the output of the app
app.respond(output)
# run the application
app.run()
When deployed Seaplane creates the application and all the associated resources in the DAG. In this case, it creates the following two tasks.
example-app.my-dag.demo-task-0
example-app.my-dag.demo-task-1
Nested DAGs​
You can nest DAGs by instantiating one DAG inside another. While there is no theoretical maximum we tested nested DAGs up to 10 levels deep without any issues. Anything beyond 10 is at your own risk.
Prebuilt Seaplane DAGs​
Seaplane comes with a set of prebuilt DAGs for common tasks such as collecting inputs from debatched input or multiple tasks and a DAG to communicate with Substation, our model hub.
You can learn more about them in the documentation linked below.
- Available Models
- Model Hub
- Multiple inputs collector
- Debatch collector