Skip to main content
Version: 0.4.0

Task Replication

Any task inside an application can scale independently from each other. This allows you to spin up multiple tasks where needed to speed up processing.

For example, for tasks talking to slow external services such as LLMs, you could spin up multiple copies of the same task to increase the processing speed of your application.

To create multiple copies of the same task add replicas=x to your @task decorator. Replace x with the number of replicas required.

replicated task example
from seaplane import task

@task(id='my-task', replicas=3)
def my_task(context):
# do something in your task

Please note that replicated tasks output the logs to the same log stream. This means that for the task above you can expect to see some log items 3 times, one for each replicated task.

note

We are working on more advanced features that automatically scale tasks between a minimum and a maximum based on the number of messages in the queue. Keep an eye on this page for future developments, or reach out to support@seaplane.io for early access

Example​

To better illustrate the document above let's look at an example. Assume you are building an application that relies on OpenAI for LLM inferencing. Calls to GPT-3.5 can often take up to 10 seconds for larger requests. If you have an application, that makes a lot of calls in a single pipeline run this can create a serious bottleneck. You can overcome this bottleneck by replicating your LLM inference task multiple times and sending each call as a separate request.

In this example, we assume the application takes a JSON object as input containing a list of prompts.

'{
"prompts" : ["tell me a joke", "write me a short story", "another prompt"]
}'

The input is sent to the first task which debatches the input, or in other words sends each prompt downstream to the LLM task for processing.

debatch.py
from seaplane import task, app
import json

# debatch task
@task(id='debatch')
def debatch(context):
# load the context body
data = json.loads(context.body)

# emit each prompt individually
for prompt in data["prompts"]:
context.emit(json.dumps({"prompt" : prompt}))

The second task is the LLM inference task that takes each individual inferencing request and sends it to OpenAI for processing. Each of the items in the input list is processed in parallel as this task is replicated 3 times. Potentially speeding up the total processing 3x.

llm.py
from seaplane import task, app
import json
import openai

# debatch task
@task(id='llm', replicas=3)
def llm(context):
# set the openai API key from the .env file
openai.api_key = os.getenv("OPENAI_API_KEY")

# load the context body
data = json.loads(context.body)

# call openAI
result = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": data['prompt']}],
temperature=0.1,
)

# emit the result to return it from the application
context.emit(json.dumps(result))

Finally, in main.py we construct the DAG (directed acyclic graph) connecting the tasks together.

main.py
# import all the tasks
from llm import llm
from debatch import debatch

# create the application
@app(id='my-llm-application', path='/hello-world')
def my_llm_app(data)
# construct the DAG
debatched = debatch(data)
return llm(debatched)

start()