Skip to main content
Version: 0.6.0

Definitions, Instances and Replicas

In the Seaplane documentation, you will find references to instances, replicas and definitions. They apply to tasks, DAGs (Directed Acyclic Graphs), applications, and other elements in the Seaplane SDK. It is important to understand the difference between these three items. This page explains the difference between them and how they apply tasks, DAGs and applications.

Definitions​

Definitions are the blueprints of tasks, DAGs or applications. They are the code that defines what the task, DAG or application does.

Task and DAG definitions are defined through regular Python functions. For example, the code below defines a simple task that takes the input and multiplies it by two.

Example task definitions
def example_task(msg):
# multiply by 2
output = int(msg.body) * 2

# convert to byts string
yield str(output).encode('utf-8')

Similarly, we can create a DAG definition as follows.

Example DAG definition
def example_dag(app, name, input):
# create new dag
dag = app.dag(name)

# add tasks to your dag
output_a = dag.task(task_a, [input], instance_name="task-a")
output_b = dag.task(task_b, [output_a], instance_name="task-b")

# send response from dag
dag.respond(output_b)

# return dag to use in app
return dag

Application definitions are defined in main.py. When deployed they create an application on the Seaplane platform.

By creating definitions you enable re-usability of your code. You and your coworkers can create standard tasks and DAG definitions that can be reused in any other Seaplane application. In addition, you can use any of the task or DAG definitions in the Seaplane app store (coming soon).

For example, if you and your team are building a genAI application that requires RAG, but the same RAG in multiple locations. You can define the RAG task once and reuse it in multiple locations by creating multiple instances of the same task in different DAGs or applications.

The same is true for DAGs, if you have a process that requires multiple tasks, you can wire them up in a DAG definition and import and instantiate the DAG where needed. Just like the Seaplane DAG that talks to our model hub.

Instances​

You can create instances of definitions. This section describes how to create instances of tasks and DAGs.

Task Instances​

Tasks are instantiated by calling the task function on a dag and supplying it with the three required parameters

  • function - the task definition.
  • list of message - A list of input messages, one or more.
  • str - A unique name for this task within the scope of the DAG that its added to.
Creating a task instance
# import the tasks
from my_tasks import task_a

# create DAG definition
def example_dag(app, name, input):
# create new dag
dag = app.dag(name)

# create an instance of task_a
output_a = dag.task(task_a, [input], instance_name="task-a")

# send response from dag
dag.respond(output_b)

# return dag to use in app
return dag

Replicas​

Tasks have another property known as replicas. Replicas allow you to create more replicas of the same task at the same location in your DAG. You can set the number of replicas by supplying your task instantiation with replicas=<NUMBER OF REPLICAS>. For example, the following task instantiation creates three replicas of the task.

Setting replicas for a task
dag.task(my_task, [input], instance_name='my-task', replicas=3)

Generally speaking, you use replicas to speed up the processing of messages and resolve bottlenecks. For example, assume you have the following DAG.



This DAG currently has one instance of Task A and one instance of Task B. Both with one replica.

Assume Task A is a CPU-intensive task that takes much longer than Task B, thus creating a bottleneck. You can speed up the processing of your pipeline by creating more replicas of Task A. Both replicas can process messages increasing throughput in that section of your DAG.



DAG instances​

DAGs are instantiated by calling your DAG function inside an app or another DAG. For example, assume you have the Example DAG definition as shown above. You can instantiate this DAG by simply calling its function and supplying it with the required arguments.

Instantiating a DAG
from seaplane.apps import App
# create an app instance
app = App("my-app")

# create a dag instance
dag_output = example_dag(app, "example-dag", app.input())
app.respond(dag_output)

# run the app
app.run()