Skip to main content
Version: 0.6.0

Conditional Routing

Conditional routing allows you to route messages inside a DAG to specific tasks based on a condition. This document describes how to use conditional routing inside a DAG.

You can execute a conditional route using named or anonymous functions. Anonymous functions are generally more concise and therefore more suitable for simple comparisons such as x > 5 whereas named functions allow for more complex comparisons. However, anonymous functions require you to set a routing variable on the task output. Named functions on the other hand can directly interact with the output of a task.

Available Methods​

You can apply conditional routing on any task output using the if_() method. Subsequently you can call elif_() or else_() for additional routes.

Available methods:

  • task_output.if_(condition)
  • conditional_output.elif_(condition)
  • conditional_output.else_()

For example, the following workflow routes messages to task a if x==1, to task b if x==2 and otherwise to task c.

conditional example
from tasks import demo_task, demo_task_2, task_a, task_b, task_c

def my_dag(app, name, input)
# create the dag
dag = app.dag(name)

# initial task to check output from
output = dag.task(demo_task, [input], instance_name='demo-task')

# conditionals
is_one = output.if_(lambda x: x.val == 1)
is_two = is_one.elif_(lambda x: x.val == 2)
not_one_two = is_two.else_()

# conditional wiring
a_output = dag.task(task_a, [is_one], instance_name='task-a')
b_output = dag.task(task_b, [is_two], instance_name='task-b')
c_output = dag.task(task_c, [not_one_two], instance_name='task-c')

# run all ouput through a single final task
final_output = dag.task(demo_task_2, [a_output, b_output, c_output], instance_name='demo-task-2')

# set the response of the dag and return the dag
dag.respond(final_output)
return dag
note

Seaplane requires you to set a routing variable on the return message from the upstream task to use anonymous functions as conditionals in DAG routing. You can learn more about setting routing variables in the section below.

We can visualize this DAG as follows:

Using Anonymous Functions for Routing​

Conditionals can evaluate anonymous functions. This requires the upstream task to set a routing variable to use in the conditional. Anonymous functions are best used for simple comparisons such as x==1, y==True or test in 'test_string' etc.

To use an anonymous function, first set the routing variable in the upstream task as follows. You can add multiple routing variables to a single return value if needed.

example routing paramaters task
def my_task(msg):
# your task logic here

# set byte encoded return value
ret = msg.respond(output)

# add routing parameters
ret.number = 1
ret.string_comp = "this is a string value"
ret.val = True

# yield the result from your task
yield ret

The output of this task can now be used to route data conditionally with an anonymous function as follows inside a DAG or application. If the anonymous function evaluates to True it will execute the path.

Anonymous function for routing
# get output from task
task_output = dag.task()

# check conditions
x_one = task_output.if_(lambda x: x.number==1)
string_in = task_ouptut.if_(lambda x: "string" in x.string_comp)
is_true = task_output.if_(lambda x: x.val)

Using Named Functions for Routing​

You can use named functions for more complex conditionals that do not fit inside an anonymous function. First, define a new named function that returns True or False depending on your condition. Named functions for conditionals consume messages just like a Task does. You can learn more about message consumption here.

For example, we can write the following named function as a conditional to check if certain values are present in a JSON object.

Named function to check condition
import json
def check_keys(msg):
# load input data
data = json.loads(msg.body)

# check if all keys in dict
for k in ['key1', 'key2', 'key3']:
if data.get(k)
continue
else:
return False
return True

Add the named function as a conditional to your DAG as follows.

Using named functions for conditional routing
# get output from task
task_output = dag.task()

# check conditions
all_keys = task_output.if_(check_keys)
missing_keys = all_keys.else_()

Limitations​

Seaplane currently supports up to four conditional routes per app. We are working on an update that includes unlimited conditionals. If you have a use case that requires more than four conditionals reach out to support@seaplane.io. We can show you a workaround that allows unlimited conditionals in the current version of the Seaplane SDK.