Skip to main content
Version: 0.7.0 (alpha)

Task Message

The task message (msg) is the interface to read input data, send data between tasks and return data from an application. This document describes how to use task msg and all its features.

Reading Data​

You can request the data inside a task with the body method on the msg parameter.

basic example
def my_task(msg):
# load the task input data
data = msg.body

Data in the msg is transferred as binary data, which allows you to send any binary data type from JSON strings to fully-ledged PDFs. For example, logging data in the example above produces b"hello world" if the input to the task is "hello world".

Sending data as binary does mean your task needs to process it into a readable format. For example, you can load a string by encoding it with UTF-8 as follows msg.body.decode("utf-8").

A commonly used format for sending data between tasks is JSON strings. You can load JSON strings (even byte-encoded) into Python dictionaries with the json package as follows.

JSON example
import json

def my_task(msg):
# load the input json
json_obj = json.loads(msg.body)

Yield Data​

Use the msg object to yield data from a task with the yield method.

def my_task(msg):
# load the task input data
data = msg.body

# send the output (data)
yield data

Debatching​

Tasks and the msg object support emitting multiple data points per task. This allows you to debatch data. For example, assume you have an input list of words ['hello', 'world']. You can use yield in a loop to send each word in the list to the next task for downstream processing.

Simple Debatch Example
from seaplane.apps import App

def my_task(msg):
# load the task input data
data = msg.body

for word in data:
yield word

# create http enabled app
app = App("my_app_name")



my_app = App(data):
return my_task(data)

You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.

Word List Exploder
from seaplane.apps import task, app

@task()
def my_task(msg):
# load the data
input = msg.body.decode('utf-8')

# loop through the words
for word in input.split(","):
# return each word
msg.emit(word)

@task()
def my_task_2(msg):
# load the data
word = msg.body.decode('utf-8')

# loop through string
for letter in word:
# return each letter
msg.emit(letter)

# create http enabled app
@app()
def my_app(data):
word = my_task(data)
return my_task_2(word)

Returning Data From an Application​

To return data from an application simply return the output of the last task in your DAG. This automatically returns the data that is emitted from this task.

from seaplane.apps import app

@app()
def my_app(data):
# return the msg emited by my_task
return my_task(data)

Metadata​

The msg object contains metadata in addition to the data submitted to the application. Use the ._meta method on the msg object to access the meta-data. The metadata includes information about the data as well as the Seaplane request ID.

{
"nats_timestamp_unix_nano":"1698791852189766456",
"nats_subject":"_SEAPLANE_ENDPOINT.in.<YOUR-APP-NAME>.0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"nats_sequence_stream":"268",
"nats_sequence_consumer":"5",
"nats_num_delivered":"1",
"nats_num_pending":"0",
"_seaplane_request_id":"0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"_seaplane_batch_hierarchy":""
}