Skip to main content
Version: 0.4.0

Task Context

The task context is the interface to read input data, send data between tasks and return data from an application. This document describes how to use task context and all its features.

Reading Data​

You can request the data inside a task with the body method on the context parameter.

basic example
from seaplane import task

@task(id='my-task')
def my_task(context):
# load the task input data
data = context.body

Data in the context is transferred as binary data, which allows you to send any binary data type from JSON strings to fully-ledged PDFs. For example, logging data in the example above produces b"hello world" if the input to the task is "hello world".

Sending data as binary does mean your task needs to process it into a readable format. For example, you can load a string by encoding it with UTF-8 as follows context.body.decode("utf-8").

A commonly used format for sending data between tasks is JSON strings. You can load JSON strings (even byte-encoded) into Python dictionaries with the json package as follows.

JSON example
from seaplane import task
import json

@task(id='my-task')
def my_task(context):
# load the input json
json_obj = json.loads(context.body)

Emitting Data​

Use the context object to emit data from a task with the emit method.

from seaplane import task

def my_task(context):
# load the task input data
data = context.body

# emit the data
context.emit(data)

Debatching​

Tasks and the context object support emitting multiple data points per task. This allows you to debatch data. For example, assume you have an input list of words ['hello', 'world']. You can use the emit method in a loop to emit each word in the list to the next task for downstream processing.

Simple Debatch Example
from seaplane import task

def my_task(id='my-task', context):
# load the task input data
data = context.body

for word in data:
context.emit(word)

# create http enabled app
@app(id='my-app', path='/my-app')
def my_app(data):
return my_task(data)

You can chain multiple tasks and debatchers together for even more powerful workflows. For example, you can explode a list of words with the following application and tasks.

Word List Exploder
from seaplane import task, app

@task(id='my-task')
def my_task(context):
# load the data
input = context.body.decode('utf-8')

# loop through the words
for word in input.split(","):
# return each word
context.emit(word)

@task(id='my-task-2')
def my_task_2(context):
# load the data
word = context.body.decode('utf-8')

# loop through string
for letter in word:
# return each letter
context.emit(letter)

# create http enabled app
@app(id='my-app', path='/my-app')
def my_app(data):
word = my_task(data)
return my_task_2(word)

Returning Data From an Application​

To return data from an application simply return the output of the last task in your DAG. This automatically returns the data that is emitted from this task.

from seaplane import app
@app(id='my-app', path='/my-app')
def my_app(data):
# return the context emited by my_task
return my_task(data)

Metadata​

The context object contains metadata in addition to the data submitted to the application. Use the ._meta method on the context object to access the meta-data. The metadata includes information about the data as well as the Seaplane request ID.

{
"nats_timestamp_unix_nano":"1698791852189766456",
"nats_subject":"_SEAPLANE_ENDPOINT.in.<YOUR-APP-NAME>.0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"nats_sequence_stream":"268",
"nats_sequence_consumer":"5",
"nats_num_delivered":"1",
"nats_num_pending":"0",
"_seaplane_request_id":"0bf0acf4-ab87-49ef-849d-5df08e7c987c",
"_seaplane_batch_hierarchy":""
}