Task Message
The task message (msg
) is the interface to read input data, send data between tasks
and return data from an application. This document describes how to use task
msg and all its features.
Reading Data​
You can request the data inside a task with the body
method on the msg
parameter.
def my_task(msg):
# load the task input data
data = msg.body
Data in the msg is transferred as binary data, which allows you to send any
binary data type from JSON strings to fully-ledged PDFs. For example, logging
data
in the example above produces b"hello world"
if the input to the task
is "hello world"
.
Sending data as binary does mean your task needs to process it into a readable
format. For example, you can load a string by encoding it with UTF-8
as
follows msg.body.decode("utf-8")
.
A commonly used format for sending data between tasks is JSON strings. You can
load JSON strings (even byte-encoded) into Python dictionaries with the json
package as follows.
import json
def my_task(msg):
# load the input json
json_obj = json.loads(msg.body)
Yield Data​
Use the msg
object to yield data from a task with the yield
method.
def my_task(msg):
# load the task input data
data = msg.body
# send the output (data)
yield data
Debatching​
Tasks and the msg object support emitting multiple data points per task.
This allows you to debatch data. For example, assume you have an input list of
words ['hello', 'world']
. You can use yield
in a loop to send each
word in the list to the next task for downstream processing.
from seaplane.apps import App
def my_task(msg):
# load the task input data
data = msg.body
for word in data:
yield word
# create http enabled app
app = App("my_app_name")
my_app = App(data):
return my_task(data)
You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.
from seaplane.apps import task, app
@task()
def my_task(msg):
# load the data
input = msg.body.decode('utf-8')
# loop through the words
for word in input.split(","):
# return each word
msg.emit(word)
@task()
def my_task_2(msg):
# load the data
word = msg.body.decode('utf-8')
# loop through string
for letter in word:
# return each letter
msg.emit(letter)
# create http enabled app
@app()
def my_app(data):
word = my_task(data)
return my_task_2(word)
Returning Data From an Application​
To return data from an application simply return the output of the last task in your DAG. This automatically returns the data that is emitted from this task.
from seaplane.apps import app
@app()
def my_app(data):
# return the msg emited by my_task
return my_task(data)
Metadata​
The msg object contains metadata in addition to the data submitted to the
application. Use the ._meta
method on the msg object to access the
meta-data. The metadata includes information about the data as well as the
Seaplane request ID.
{
"nats_timestamp_unix_nano":"1698791852189766456",
"nats_subject":"_SEAPLANE_ENDPOINT.in.<YOUR-APP-NAME>.0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"nats_sequence_stream":"268",
"nats_sequence_consumer":"5",
"nats_num_delivered":"1",
"nats_num_pending":"0",
"_seaplane_request_id":"0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"_seaplane_batch_hierarchy":""
}