Task Context
The task context is the interface to read input data, send data between tasks and return data from an application. This document describes how to use task context and all its features.
Reading Data​
You can request the data inside a task with the body
method on the context
parameter.
from seaplane import task
@task(id='my-task')
def my_task(context):
# load the task input data
data = context.body
Data in the context is transferred as binary data, which allows you to send any
binary data type from JSON strings to fully-ledged PDFs. For example, logging
data
in the example above produces b"hello world"
if the input to the task
is "hello world"
.
Sending data as binary does mean your task needs to process it into a readable
format. For example, you can load a string by encoding it with UTF-8
as
follows context.body.decode("utf-8")
.
A commonly used format for sending data between tasks is JSON strings. You can
load JSON strings (even byte-encoded) into Python dictionaries with the json
package as follows.
from seaplane import task
import json
@task(id='my-task')
def my_task(context):
# load the input json
json_obj = json.loads(context.body)
Emitting Data​
Use the context
object to emit data from a task with the emit
method.
from seaplane import task
def my_task(context):
# load the task input data
data = context.body
# emit the data
context.emit(data)
Debatching​
Tasks and the context object support emitting multiple data points per task.
This allows you to debatch data. For example, assume you have an input list of
words ['hello', 'world']
. You can use the emit
method in a loop to emit each
word in the list to the next task for downstream processing.
from seaplane import task
def my_task(id='my-task', context):
# load the task input data
data = context.body
for word in data:
context.emit(word)
# create http enabled app
@app(id='my-app', path='/my-app')
def my_app(data):
return my_task(data)
You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.
from seaplane import task, app
@task(id='my-task')
def my_task(context):
# load the data
input = context.body.decode('utf-8')
# loop through the words
for word in input.split(","):
# return each word
context.emit(word)
@task(id='my-task-2')
def my_task_2(context):
# load the data
word = context.body.decode('utf-8')
# loop through string
for letter in word:
# return each letter
context.emit(letter)
# create http enabled app
@app(id='my-app', path='/my-app')
def my_app(data):
word = my_task(data)
return my_task_2(word)
Returning Data From an Application​
To return data from an application simply return the output of the last task in your DAG. This automatically returns the data that is emitted from this task.
from seaplane import app
@app(id='my-app', path='/my-app')
def my_app(data):
# return the context emited by my_task
return my_task(data)
Metadata​
The context object contains metadata in addition to the data submitted to the
application. Use the ._meta
method on the context object to access the
meta-data. The metadata includes information about the data as well as the
Seaplane request ID.
{
"nats_timestamp_unix_nano":"1698791852189766456",
"nats_subject":"_SEAPLANE_ENDPOINT.in.<YOUR-APP-NAME>.0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"nats_sequence_stream":"268",
"nats_sequence_consumer":"5",
"nats_num_delivered":"1",
"nats_num_pending":"0",
"_seaplane_request_id":"0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"_seaplane_batch_hierarchy":""
}