Definitions, Instances and Replicas
In the Seaplane documentation, you will find references to instances
,
replicas
and definitions
. They apply to tasks, DAGs (Directed Acyclic
Graphs), applications, and other elements in the Seaplane SDK. It is important to
understand the difference between these three items. This page explains the
difference between them and how they apply tasks, DAGs and applications.
Definitions​
Definitions are the blueprints of tasks, DAGs or applications. They are the code that defines what the task, DAG or application does.
Task and DAG definitions are defined through regular Python functions. For example, the code below defines a simple task that takes the input and multiplies it by two.
def example_task(msg):
# multiply by 2
output = int(msg.body) * 2
# convert to byts string
yield str(output).encode('utf-8')
Similarly, we can create a DAG definition as follows.
def example_dag(app, name, input):
# create new dag
dag = app.dag(name)
# add tasks to your dag
output_a = dag.task(task_a, [input], instance_name="task-a")
output_b = dag.task(task_b, [output_a], instance_name="task-b")
# send response from dag
dag.respond(output_b)
# return dag to use in app
return dag
Application definitions are defined in main.py
. When deployed they create an
application on the Seaplane platform.
By creating definitions you enable re-usability of your code. You and your coworkers can create standard tasks and DAG definitions that can be reused in any other Seaplane application. In addition, you can use any of the task or DAG definitions in the Seaplane app store (coming soon).
For example, if you and your team are building a genAI application that requires RAG, but the same RAG in multiple locations. You can define the RAG task once and reuse it in multiple locations by creating multiple instances of the same task in different DAGs or applications.
The same is true for DAGs, if you have a process that requires multiple tasks, you can wire them up in a DAG definition and import and instantiate the DAG where needed. Just like the Seaplane DAG that talks to our model hub.
Instances​
You can create instances of definitions. This section describes how to create instances of tasks and DAGs.
Task Instances​
Tasks are instantiated by calling the task function on a dag
and supplying it
with the three required parameters
function
- the task definition.list
ofmessage
- A list of input messages, one or more.str
- A unique name for this task within the scope of the DAG that its added to.
# import the tasks
from my_tasks import task_a
# create DAG definition
def example_dag(app, name, input):
# create new dag
dag = app.dag(name)
# create an instance of task_a
output_a = dag.task(task_a, [input], instance_name="task-a")
# send response from dag
dag.respond(output_b)
# return dag to use in app
return dag
Replicas​
Tasks have another property known as replicas. Replicas allow you to create more
replicas of the same task at the same location in your DAG. You can set the
number of replicas by supplying your task instantiation with replicas=<NUMBER OF REPLICAS>
. For example, the following task instantiation creates three
replicas of the task.
dag.task(my_task, [input], instance_name='my-task', replicas=3)
Generally speaking, you use replicas to speed up the processing of messages and
resolve bottlenecks. For example, assume you have the following DAG.
This DAG currently has one instance of Task A
and one instance of Task B
. Both with one replica.
Assume Task A
is a CPU-intensive task that takes much longer than Task B
,
thus creating a bottleneck. You can speed up the processing of your pipeline by
creating more replicas of Task A
. Both replicas can process messages
increasing throughput in that section of your DAG.
DAG instances​
DAGs are instantiated by calling your DAG function inside an app or another DAG.
For example, assume you have the Example DAG definition
as shown above. You
can instantiate this DAG by simply calling its function and supplying it with
the required arguments.
from seaplane.apps import App
# create an app instance
app = App("my-app")
# create a dag instance
dag_output = example_dag(app, "example-dag", app.input())
app.respond(dag_output)
# run the app
app.run()