Tasks
This document describes the function of tasks inside the Seaplane platform and how to use them. Tasks are the building blocks of any Seaplane application. Once deployed they turn into individually scaling compute containers.
Tasks exist in two different states, definitions and instances. You can read more about the difference between the two states here.
Task definitions are completely reusable and can be added to multiple applications or even instantiated multiple times in a DAG (Directed Acyclic Graph).
Defining Tasks​
Tasks are defined through regular Python functions and take one input parameter
msg
of type Message
.
def my_task(msg)
# run your task code here
The msg
parameter inside the function definition holds the input data that is
retrieved from the previous step in your DAG or the application entry point. You
can read more about how this works in the DAG documentation.
Instantiating Tasks​
Tasks are instantiated by adding them to a DAG. Task instantiation takes three required input parameters and one optional parameter.
function
- Task definition (required)list
ofMessage
s - Input messages, one or more (required)str
-instance_name
- The unique (within the DAG) instance name (required)int
-replicas
- The number of replicas (optional)
In the example below, we first create an App
and a dag
and then instantiate
the task in the DAG on line 12.
from seaplane.apps import App
# create a task definition
def task(msg):
print("Hello World")
# create app and DAG
app = App("hello-world-app")
dag = app.dag("hello-world-dag")
# instantiate task
output = dag.task(task, [app.input()], instance_name='my-task-instance')
# set the response of the app
app.respond(output)
You can replicate tasks to increase the throughput of messages. This is particularly useful for workloads that are suitable for debatching input. You can read more about debatching input below.
To replicate a task, set the replicas
parameter to the numbers or replicas you
wish to deploy. For example, we can extend the task instantation in the snippet
above as follows to create three replicas of the my-task-instance
# instantiate task
output = dag.task(task, [app.input()], instance_name='my-task-instance', replicas=3)
When deployed this creates the following DAG structure. Allowing a theoretical throughput three times higher compared to a single replica.
A future version of the Seaplane SDK will provide a feature to automatically scale task replicas as needed based on the number of messages in the queue. If you want early access let us know in Discord or by email support@seaplane.io
Reading Data​
You can retrieve task input data using the body
method of the msg
. The value
this variable holds depends on the upstream task, dag or application entry point
as defined in your DAG.
def my_task(msg)
# read the data, retrieved as bytes
data = msg.body
The body
content of msg
s is transferred as binary data. This allows you to
send any data type from JSON strings to fully-ledged PDFs. For example, printing
data
in the example above produces b"hello world"
if the input to the task
is "hello world"
.
Sending data as binary does mean your task needs to process it into a readable
format. For example, you can load a string by encoding it with UTF-8
as
follows msg.body.decode("utf-8")
.
A commonly used format for sending data between tasks is JSON strings. You can
load JSON strings into Python dictionaries with the json
package as follows.
import json
# create task defintion
def my_task(msg):
# load the input json
json_obj = json.loads(msg.body)
Metadata​
The msg
parameter contains metadata about the message such as timestamp, batch
hierachy information and more. Use the meta
method on the msg
parameter to
access the meta-data.
msg.meta
The snippet below shows an example output of msg.meta
.
{
"nats_timestamp_unix_nano": "1713810265950237141",
"nats_subject": "_SEAPLANE_ENDPOINT.in.hello-world.08065bf4-9915-43bb-b027-b2df9ba0a09e",
"nats_sequence_stream": "18265",
"nats_sequence_consumer": "2",
"nats_num_delivered": "1",
"nats_num_pending": "0",
"_seaplane_request_id": "08065bf4-9915-43bb-b027-b2df9ba0a09e",
"_seaplane_output_id": "08065bf4-9915-43bb-b027-b2df9ba0a09e",
"_seaplane_batch_hierarchy": "",
}
Emitting Data​
Use yield
to send data from a task to the next downstream element i.e., task,
DAG or API response end-point. The consumer of these messages is defined in the
data flow in the DAG
def my_task(msg):
# yield the output
yield b"hello world"
All messages between tasks are byte-encoded, make sure to provide the right
encoding for your output. For example, to send JSON objects between tasks use
json.dumps()
as follows:
import json
def my_task(msg):
# yield the json output
yield json.dumps({"hello" : "world"})
Debatching​
You can yield between 0 and N
messages per task input. This feature allows you
to break up a single input message into multiple output messages. This feature
is particularly useful for workloads that can be processed in parallel. Reducing
the overall processing time of a request.
For example, assume you have an input list of words ['hello', 'world']
. You
can use yield
in a loop to emit each word in the list to the next task for
downstream processing.
def split_word(msg):
# load the task input data
data = msg.body
for word in data:
yield word.encode('utf-8')
You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.
def explode_word(msg):
# load the data
data = msg.body.decode('utf-8')
# loop through the words
for word in data.split(","):
# return each word
yield word.encode("utf-8")
def explode_letter(msg):
# load the data
word = msg.body.decode("utf-8")
# loop through string
for letter in word:
# return each letter
yield letter.encode("utf-8")
# create the app and DAG
app = App("word-explode")
dag = app.dag("explode-dag")
# wire up the tasks
word = dag.task(explode_word, [app.input()], instance_name="explode-word")
exploded = dag.task(explode_letter, [word], instance_name="explode-letter")
# set the DAG resposne
dag.respond(exploded)