Skip to main content
Version: 0.6.0

Tasks

This document describes the function of tasks inside the Seaplane platform and how to use them. Tasks are the building blocks of any Seaplane application. Once deployed they turn into individually scaling compute containers.

Tasks exist in two different states, definitions and instances. You can read more about the difference between the two states here.

Task definitions are completely reusable and can be added to multiple applications or even instantiated multiple times in a DAG (Directed Acyclic Graph).

Defining Tasks​

Tasks are defined through regular Python functions and take one input parameter msg of type Message.

def my_task(msg)
# run your task code here

The msg parameter inside the function definition holds the input data that is retrieved from the previous step in your DAG or the application entry point. You can read more about how this works in the DAG documentation.

Instantiating Tasks​

Tasks are instantiated by adding them to a DAG. Task instantiation takes three required input parameters and one optional parameter.

  • function - Task definition (required)
  • list of Messages - Input messages, one or more (required)
  • str - instance_name - The unique (within the DAG) instance name (required)
  • int - replicas - The number of replicas (optional)

In the example below, we first create an App and a dag and then instantiate the task in the DAG on line 12.

Example task instantiation
from seaplane.apps import App

# create a task definition
def task(msg):
print("Hello World")

# create app and DAG
app = App("hello-world-app")
dag = app.dag("hello-world-dag")

# instantiate task
output = dag.task(task, [app.input()], instance_name='my-task-instance')

# set the response of the app
app.respond(output)

You can replicate tasks to increase the throughput of messages. This is particularly useful for workloads that are suitable for debatching input. You can read more about debatching input below.

To replicate a task, set the replicas parameter to the numbers or replicas you wish to deploy. For example, we can extend the task instantation in the snippet above as follows to create three replicas of the my-task-instance

# instantiate task
output = dag.task(task, [app.input()], instance_name='my-task-instance', replicas=3)

When deployed this creates the following DAG structure. Allowing a theoretical throughput three times higher compared to a single replica.





tip

A future version of the Seaplane SDK will provide a feature to automatically scale task replicas as needed based on the number of messages in the queue. If you want early access let us know in Discord or by email support@seaplane.io

Reading Data​

You can retrieve task input data using the body method of the msg. The value this variable holds depends on the upstream task, dag or application entry point as defined in your DAG.

def my_task(msg)
# read the data, retrieved as bytes
data = msg.body

The body content of msgs is transferred as binary data. This allows you to send any data type from JSON strings to fully-ledged PDFs. For example, printing data in the example above produces b"hello world" if the input to the task is "hello world".

Sending data as binary does mean your task needs to process it into a readable format. For example, you can load a string by encoding it with UTF-8 as follows msg.body.decode("utf-8").

A commonly used format for sending data between tasks is JSON strings. You can load JSON strings into Python dictionaries with the json package as follows.

JSON example
import json

# create task defintion
def my_task(msg):
# load the input json
json_obj = json.loads(msg.body)

Metadata​

The msg parameter contains metadata about the message such as timestamp, batch hierachy information and more. Use the meta method on the msg parameter to access the meta-data.

msg.meta

The snippet below shows an example output of msg.meta.

{
"nats_timestamp_unix_nano": "1713810265950237141",
"nats_subject": "_SEAPLANE_ENDPOINT.in.hello-world.08065bf4-9915-43bb-b027-b2df9ba0a09e",
"nats_sequence_stream": "18265",
"nats_sequence_consumer": "2",
"nats_num_delivered": "1",
"nats_num_pending": "0",
"_seaplane_request_id": "08065bf4-9915-43bb-b027-b2df9ba0a09e",
"_seaplane_output_id": "08065bf4-9915-43bb-b027-b2df9ba0a09e",
"_seaplane_batch_hierarchy": "",
}

Emitting Data​

Use yield to send data from a task to the next downstream element i.e., task, DAG or API response end-point. The consumer of these messages is defined in the data flow in the DAG

def my_task(msg):
# yield the output
yield b"hello world"

All messages between tasks are byte-encoded, make sure to provide the right encoding for your output. For example, to send JSON objects between tasks use json.dumps() as follows:

import json

def my_task(msg):
# yield the json output
yield json.dumps({"hello" : "world"})

Debatching​

You can yield between 0 and N messages per task input. This feature allows you to break up a single input message into multiple output messages. This feature is particularly useful for workloads that can be processed in parallel. Reducing the overall processing time of a request.

For example, assume you have an input list of words ['hello', 'world']. You can use yield in a loop to emit each word in the list to the next task for downstream processing.

Simple Debatch Example
def split_word(msg):
# load the task input data
data = msg.body

for word in data:
yield word.encode('utf-8')

You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.

Word List Exploder
def explode_word(msg):
# load the data
data = msg.body.decode('utf-8')

# loop through the words
for word in data.split(","):
# return each word
yield word.encode("utf-8")

def explode_letter(msg):
# load the data
word = msg.body.decode("utf-8")

# loop through string
for letter in word:
# return each letter
yield letter.encode("utf-8")

# create the app and DAG
app = App("word-explode")
dag = app.dag("explode-dag")

# wire up the tasks
word = dag.task(explode_word, [app.input()], instance_name="explode-word")
exploded = dag.task(explode_letter, [word], instance_name="explode-letter")

# set the DAG resposne
dag.respond(exploded)