Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Batch ProcessProcess items in batches with configurable size
BranchConditional branching based on expression evaluation
BreakpointPause workflow execution for human approval or input
Circuit BreakerCircuit breaker pattern to prevent cascading failures
ContainerEmbedded subflow container for organizing complex workflows
DebounceDebounce execution to prevent rapid repeated calls
EndExplicit workflow end node
Error HandlerCatches and handles errors from upstream nodes
Error Workflow TriggerEntry point for error workflows - triggered when another workflow fails
For EachIterate over a list and execute steps for each item
ForkSplit execution into parallel branches
GotoUnconditional jump to another step
Invoke WorkflowExecute an external workflow file
JoinWait for parallel branches to complete
LoopRepeat steps N times using output port routing
MergeMerge multiple inputs into a single output
ParallelExecute multiple tasks in parallel with different strategies
Rate LimitRate limit execution using token bucket or sliding window
RetryRetry failed operations with configurable backoff
StartExplicit workflow start node
SubflowReference and execute an external workflow
SwitchMulti-way branching based on value matching
ThrottleThrottle execution rate with minimum interval
TriggerWorkflow entry point - manual, webhook, schedule, or event

Modules

Batch Process

flow.batch

Process items in batches with configurable size

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Number of items per batch
delay_msnumberNo0Milliseconds to wait between batches (for rate limiting)
continue_on_errorbooleanNoFalseContinue processing remaining batches if one fails
parallel_batchesnumberNo1Continue processing remaining batches if one fails

Output:

FieldTypeDescription
__event__stringNumber of batches to process in parallel (1 for sequential)
batcharrayEvent for routing (batch/completed/error)
batch_indexnumberEvent for routing (batch/completed/error)
total_batchesnumberCurrent batch items
total_itemsnumberCurrent batch index (0-based)
is_last_batchbooleanTotal number of batches
progressobjectTotal number of items

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Branch

flow.branch

Conditional branching based on expression evaluation

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvent for routing (true/false/error)
outputsobjectEvent for routing (true/false/error)
resultbooleanThe true
conditionstringThe false
resolved_conditionstringCondition evaluation result

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Breakpoint

flow.breakpoint

Pause workflow execution for human approval or input

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvent for routing (approved/rejected/timeout)
breakpoint_idstringEvent for routing (approved/rejected/timeout)
statusstringUnique breakpoint identifier
approved_byarrayFinal status (approved/rejected/timeout/cancelled)
rejected_byarrayList of users who approved
custom_inputsobjectList of users who rejected
commentsarrayValues collected from custom fields
resolved_atstringComments from approvers
wait_duration_msintegerISO timestamp of resolution

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Circuit Breaker

flow.circuit_breaker

Circuit breaker pattern to prevent cascading failures

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Number of failures before opening the circuit
reset_timeout_msnumberNo60000Time in milliseconds before circuit transitions to half-open
half_open_maxnumberNo1Maximum requests allowed in half-open state

Output:

FieldTypeDescription
__event__stringEvent for routing (allowed/rejected/half_open)
statestringCircuit state (closed/open/half_open)
failure_countnumberNumber of consecutive failures
last_failure_time_msnumberTimestamp of last failure in milliseconds
time_until_half_open_msnumberMilliseconds until circuit transitions to half-open

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Container

flow.container

Embedded subflow container for organizing complex workflows

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvent for routing (success/error)
outputsobjectEvent for routing (success/error)
subflow_resultobjectError message if operation failed
exported_variablesobjectResult from subflow execution
node_countintegerVariables exported from subflow
execution_time_msnumberNumber of nodes in subflow

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Debounce

flow.debounce

Debounce execution to prevent rapid repeated calls

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Wait time after last call before executing
leadingbooleanNoFalseExecute on the leading edge (first call triggers immediately)
trailingbooleanNoTrueExecute on the trailing edge (after delay expires)

Output:

FieldTypeDescription
__event__stringEvent for routing (executed/debounced)
last_call_msnumberTimestamp of last call in milliseconds
calls_debouncednumberNumber of calls debounced since last execution
time_since_last_msnumberTime elapsed since last call in milliseconds
edgestringWhich edge triggered execution (leading/trailing)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

End

flow.end

Explicit workflow end node

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvent for routing (end)
ended_atstringEvent for routing (end)
workflow_resultobjectEvent for routing (end)

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Error Handler

flow.error_handle

Catches and handles errors from upstream nodes

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueWhat to do with the error
include_tracebackbooleanNoTrueInclude full stack trace in output
error_code_mappingobjectNo{}Include full stack trace in output
fallback_valueanyNo-Map error codes to custom actions

Output:

FieldTypeDescription
__event__stringValue to use when error is suppressed
outputsobjectEvent for routing (handled/escalate)
error_infoobjectEvent for routing (handled/escalate)
action_takenstringWhat action was taken

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Error Workflow Trigger

flow.error_workflow_trigger

Entry point for error workflows - triggered when another workflow fails

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDescription of this error workflow
error_contextobjectEvent for routing (triggered)
triggered_atstringISO timestamp when error workflow was triggered

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

For Each

flow.foreach

Iterate over a list and execute steps for each item

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-List of items to iterate over (supports ${variable} reference)
stepsarrayNo-List of items to iterate over (supports ${variable} reference)
item_varstringNoitemSteps to execute for each item (nested mode only, optional for edge mode)
index_varstringNoindexVariable name for current item
output_modestringNocollectVariable name for current index

Output:

FieldTypeDescription
__event__stringHow to collect results: collect (array), last (single), none
__set_contextobjectEvent for routing (iterate/done)
outputsobjectCurrent item being iterated
iterationnumberCurrent iteration index
statusstringCurrent iteration index
resultsarrayCurrent iteration index
countnumberOperation status

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Fork

flow.fork

Split execution into parallel branches

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvent for routing (fork/error)
input_dataanyEvent for routing (fork/error)
branch_countintegerEvent for routing (fork/error)

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Goto

flow.goto

Unconditional jump to another step

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvent for routing (goto)
targetstringEvent for routing (goto)
iterationnumberEvent for routing (goto)

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Invoke Workflow

flow.invoke

Execute an external workflow file

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParameters to pass to the invoked workflow
resultanyMaximum execution time in seconds
workflow_idstringEvent for routing (success/error)
execution_time_msnumberWorkflow execution result

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Join

flow.join

Wait for parallel branches to complete

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvent for routing (joined/timeout/error)
joined_dataarrayEvent for routing (joined/timeout/error)
completed_countintegerEvent for routing (joined/timeout/error)
strategystringData from all completed inputs

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Loop

flow.loop

Repeat steps N times using output port routing

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Number of times to repeat
targetstringNo-Number of times to repeat
stepsarrayNo-DEPRECATED: Use output ports and edges instead
index_varstringNoindexSteps to execute for each iteration (nested mode)

Output:

FieldTypeDescription
__event__stringVariable name for current index
outputsobjectVariable name for current index
iterationnumberOutput values by port
statusstringCurrent iteration count
resultsarrayCurrent iteration count
countnumberOperation status

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Merge

flow.merge

Merge multiple inputs into a single output

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvent for routing (merged/error)
merged_dataanyEvent for routing (merged/error)
input_countintegerEvent for routing (merged/error)
strategystringMerged data based on strategy

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Parallel

flow.parallel

Execute multiple tasks in parallel with different strategies

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Array of task definitions to execute in parallel
modestringNoallArray of task definitions to execute in parallel
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueStop all tasks on first failure (only for mode=all)
concurrency_limitnumberNo0Stop all tasks on first failure (only for mode=all)

Output:

FieldTypeDescription
__event__stringMaximum number of concurrent tasks (0 for unlimited)
resultsarrayEvent for routing (completed/partial/error)
completed_countnumberEvent for routing (completed/partial/error)
failed_countnumberResults from all tasks
total_countnumberNumber of successfully completed tasks
modestringNumber of failed tasks
duration_msnumberTotal number of tasks

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Rate Limit

flow.rate_limit

Rate limit execution using token bucket or sliding window

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Maximum number of requests allowed per window
window_msnumberNo60000Time window in milliseconds
strategystringNotoken_bucketRate limiting strategy (token_bucket or sliding_window)
queue_overflowstringNowaitBehavior when queue is full (drop or error)

Output:

FieldTypeDescription
__event__stringEvent for routing (allowed/limited)
tokens_remainingnumberRemaining tokens in bucket
window_reset_msnumberMilliseconds until window resets
requests_in_windownumberNumber of requests in current window
wait_msnumberMilliseconds to wait before next allowed request

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Retry

flow.retry

Retry failed operations with configurable backoff

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Maximum number of retry attempts
initial_delay_msnumberNo1000Initial delay before first retry in milliseconds
backoff_multipliernumberNo2.0Multiplier for exponential backoff
max_delay_msnumberNo30000Maximum delay between retries in milliseconds
retry_on_errorsarrayNo[]Error types to retry on (empty means retry all)

Output:

FieldTypeDescription
__event__stringEvent for routing (retry/success/failed)
attemptnumberCurrent attempt number
max_retriesnumberMaximum number of retries configured
delay_msnumberDelay before next retry in milliseconds
total_elapsed_msnumberTotal elapsed time in milliseconds
last_errorobjectLast error message

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Start

flow.start

Explicit workflow start node

Output:

FieldTypeDescription
__event__stringEvent for routing (start)
started_atstringEvent for routing (start)
workflow_idstringExplicit workflow start node

Example: Example

yaml

Subflow

flow.subflow

Reference and execute an external workflow

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvent for routing (success/error)
resultanyEvent for routing (success/error)
execution_idstringEvent for routing (success/error)
workflow_refstringSubflow execution result

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Switch

flow.switch

Multi-way branching based on value matching

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvent for routing (case:value or default)
outputsobjectEvent for routing (case:value or default)
matched_casestringEvent for routing (case:value or default)
valueanyOutput values by port

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Throttle

flow.throttle

Throttle execution rate with minimum interval

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Minimum time between executions in milliseconds
leadingbooleanNoTrueExecute on the leading edge (first call passes immediately)

Output:

FieldTypeDescription
__event__stringEvent for routing (executed/throttled)
last_execution_msnumberTimestamp of last allowed execution
calls_throttlednumberNumber of calls throttled since last execution
time_since_last_msnumberTime elapsed since last execution in milliseconds
remaining_msnumberMilliseconds remaining until next execution is allowed

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Trigger

flow.trigger

Workflow entry point - manual, webhook, schedule, or event

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvent for routing (triggered/error)
trigger_dataobjectEvent for routing (triggered/error)
trigger_typestringEvent for routing (triggered/error)
triggered_atstringData from trigger source

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.