Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
批次處理以可配置的大小批次處理項目
分支基於條件表達式進行分支
中斷點暫停工作流程執行,等待人工審核或輸入
電路斷路器電路斷路器模式以防止連鎖故障
容器用於組織複雜工作流程的嵌入式子流程容器
去抖動防止快速重複呼叫的去抖動執行
結束明確的工作流程結束節點
錯誤處理器捕捉並處理上游節點的錯誤
錯誤工作流程觸發器錯誤工作流程的入口點 - 當其他工作流程失敗時觸發
For Each 迴圈迭代列表並對每個項目執行步驟
分叉將執行分割為並行分支
跳轉無條件跳轉到另一個步驟
調用工作流程執行外部工作流程檔案
合併等待並行分支完成
迴圈使用輸出埠路由重複執行步驟 N 次
合併將多個輸入合併為單一輸出
並行以不同策略並行執行多個任務
速率限制使用令牌桶或滑動窗口限制執行速率
重試重試失敗的操作,並可配置回退時間
開始明確的工作流程開始節點
子流程參考並執行外部工作流程
切換基於值匹配的多路分支
限制速率限制執行速率,設置最小間隔
觸發器工作流程入口點 - 手動、webhook、排程或事件

Modules

批次處理

flow.batch

以可配置的大小批次處理項目

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10每批次的項目數量
delay_msnumberNo0批次間等待的毫秒數(用於速率限制)
continue_on_errorbooleanNoFalse如果一個批次失敗,繼續處理剩餘批次
parallel_batchesnumberNo1如果一個批次失敗,繼續處理剩餘批次

Output:

FieldTypeDescription
__event__string並行處理的批次數量(1 表示順序處理)
batcharray用於路由的事件(批次/完成/錯誤)
batch_indexnumber用於路由的事件(批次/完成/錯誤)
total_batchesnumber當前批次項目
total_itemsnumber當前批次索引(從 0 開始)
is_last_batchboolean批次總數
progressobject項目總數

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

分支

flow.branch

基於條件表達式進行分支

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__string路由事件(true/false/error)
outputsobject各埠輸出值
resultboolean分支結果
conditionstring條件值
resolved_conditionstring條件評估結果

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

中斷點

flow.breakpoint

暫停工作流程執行,等待人工審核或輸入

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__string路由事件(approved/rejected/timeout)
breakpoint_idstring中斷點 ID
statusstring狀態
approved_byarray批准者
rejected_byarray拒絕者
custom_inputsobject自訂輸入值
commentsarray審核評論
resolved_atstring解決時間
wait_duration_msinteger等待時間(毫秒)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

電路斷路器

flow.circuit_breaker

電路斷路器模式以防止連鎖故障

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5開啟電路前的失敗次數
reset_timeout_msnumberNo60000電路轉為半開前的毫秒數
half_open_maxnumberNo1半開狀態下允許的最大請求數

Output:

FieldTypeDescription
__event__string路由事件(允許/拒絕/半開)
statestring電路狀態(關閉/開啟/半開)
failure_countnumber連續失敗的次數
last_failure_time_msnumber最後一次失敗的時間戳(毫秒)
time_until_half_open_msnumber距離電路轉為半開的毫秒數

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

容器

flow.container

用於組織複雜工作流程的嵌入式子流程容器

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__string路由事件(success/error)
outputsobject各埠輸出值
subflow_resultobject子流程結果
exported_variablesobject匯出的變數
node_countinteger節點數量
execution_time_msnumber執行時間(毫秒)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

去抖動

flow.debounce

防止快速重複呼叫的去抖動執行

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-最後一次呼叫後的等待時間再執行
leadingbooleanNoFalse在前導邊緣執行(第一次呼叫立即觸發)
trailingbooleanNoTrue在後導邊緣執行(延遲結束後)

Output:

FieldTypeDescription
__event__string路由事件(已執行/去抖動)
last_call_msnumber最後一次呼叫的時間戳(毫秒)
calls_debouncednumber自上次執行以來去抖動的呼叫次數
time_since_last_msnumber自上次呼叫以來經過的時間(毫秒)
edgestring哪個邊緣觸發了執行(前導/後導)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

結束

flow.end

明確的工作流程結束節點

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__string路由事件(end
ended_atstring結束時間
workflow_resultobject工作流程結果

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

錯誤處理器

flow.error_handle

捕捉並處理上游節點的錯誤

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continue如何處理錯誤
include_tracebackbooleanNoTrue在輸出中包含完整的堆疊追蹤
error_code_mappingobjectNo{}在輸出中包含完整的堆疊追蹤
fallback_valueanyNo-將錯誤代碼映射到自定義操作

Output:

FieldTypeDescription
__event__string當錯誤被抑制時使用的值
outputsobject用於路由的事件(已處理/升級)
error_infoobject用於路由的事件(已處理/升級)
action_takenstring採取的行動

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

錯誤工作流程觸發器

flow.error_workflow_trigger

錯誤工作流程的入口點 - 當其他工作流程失敗時觸發

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__string此錯誤工作流程的描述
error_contextobject用於路由的事件(已觸發)
triggered_atstring錯誤工作流程觸發時的 ISO 時間戳

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

For Each 迴圈

flow.foreach

迭代列表並對每個項目執行步驟

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-要迭代的項目列表(支援 ${variable} 參考)
stepsarrayNo-每個項目要執行的步驟
item_varstringNoitem目前項目的變數名稱
index_varstringNoindex目前索引的變數名稱
output_modestringNocollect結果收集模式

Output:

FieldTypeDescription
__event__string路由事件(iterate/done)
__set_contextobject設定的上下文
outputsobject各埠輸出值
iterationnumber目前迭代索引
statusstring操作狀態
resultsarray收集的結果
countnumber項目總數

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

分叉

flow.fork

將執行分割為並行分支

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__string路由事件(fork/error)
input_dataany輸入資料
branch_countinteger分支數量

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

跳轉

flow.goto

無條件跳轉到另一個步驟

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__string路由事件(goto)
targetstring目標步驟
iterationnumber迭代次數

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

調用工作流程

flow.invoke

執行外部工作流程檔案

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__string路由事件(success/error)
resultany工作流程執行結果
workflow_idstring調用的工作流程 ID
execution_time_msnumber執行時間(毫秒)

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

合併

flow.join

等待並行分支完成

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__string路由事件(joined/timeout/error)
joined_dataarray合併的資料
completed_countinteger完成的分支數量
strategystring合併策略

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

迴圈

flow.loop

使用輸出埠路由重複執行步驟 N 次

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1重複次數
targetstringNo-目標步驟(已棄用)
stepsarrayNo-每次迭代要執行的步驟
index_varstringNoindex目前索引的變數名稱

Output:

FieldTypeDescription
__event__string路由事件(iterate/done)
outputsobject各埠輸出值
iterationnumber目前迭代
statusstring操作狀態
resultsarray收集的結果
countnumber迭代總數

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

合併

flow.merge

將多個輸入合併為單一輸出

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__string路由事件(merged/error)
merged_dataany合併的資料
input_countinteger輸入數量
strategystring合併策略

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

並行

flow.parallel

以不同策略並行執行多個任務

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-並行執行的任務定義陣列
modestringNoall並行執行的任務定義陣列
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrue首次失敗時停止所有任務(僅適用於模式=全部)
concurrency_limitnumberNo0首次失敗時停止所有任務(僅適用於模式=全部)

Output:

FieldTypeDescription
__event__string最大並行任務數(0 表示無限制)
resultsarray用於路由的事件(已完成/部分/錯誤)
completed_countnumber用於路由的事件(完成/部分/錯誤)
failed_countnumber所有任務的結果
total_countnumber成功完成的任務數量
modestring失敗任務的數量
duration_msnumber任務總數

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

速率限制

flow.rate_limit

使用令牌桶或滑動窗口限制執行速率

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-每個窗口允許的最大請求數
window_msnumberNo60000時間窗口(毫秒)
strategystringNotoken_bucket速率限制策略(令牌桶或滑動窗口)
queue_overflowstringNowait隊列滿時的行為(丟棄或錯誤)

Output:

FieldTypeDescription
__event__string路由事件(允許/限制)
tokens_remainingnumber桶中剩餘的令牌數
window_reset_msnumber窗口重置前的毫秒數
requests_in_windownumber當前窗口中的請求數
wait_msnumber下一次允許請求前的等待毫秒數

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

重試

flow.retry

重試失敗的操作,並可配置回退時間

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3最大重試嘗試次數
initial_delay_msnumberNo1000第一次重試前的初始延遲時間(毫秒)
backoff_multipliernumberNo2.0指數回退的倍數
max_delay_msnumberNo30000重試之間的最大延遲時間(毫秒)
retry_on_errorsarrayNo[]要重試的錯誤類型(空表示重試所有)

Output:

FieldTypeDescription
__event__string用於路由的事件(重試/成功/失敗)
attemptnumber目前嘗試次數
max_retriesnumber配置的最大重試次數
delay_msnumber下次重試前的延遲時間(毫秒)
total_elapsed_msnumber總經過時間(毫秒)
last_errorobject最後的錯誤訊息

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

開始

flow.start

明確的工作流程開始節點

Output:

FieldTypeDescription
__event__string路由事件(start)
started_atstring開始時間
workflow_idstring工作流程 ID

Example: Example

yaml

子流程

flow.subflow

參考並執行外部工作流程

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__string路由事件(success/error)
resultany執行結果
execution_idstring執行 ID
workflow_refstring工作流程參考

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

切換

flow.switch

基於值匹配的多路分支

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__string路由事件(case:value 或 default)
outputsobject各埠輸出值
matched_casestring匹配的 case
valueany匹配的值

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

限制速率

flow.throttle

限制執行速率,設置最小間隔

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-執行之間的最小時間(毫秒)
leadingbooleanNoTrue在前緣執行(第一次呼叫立即通過)

Output:

FieldTypeDescription
__event__string用於路由的事件(已執行/已限制)
last_execution_msnumber上次允許執行的時間戳
calls_throttlednumber自上次執行以來被限制的呼叫次數
time_since_last_msnumber自上次執行以來經過的時間(毫秒)
remaining_msnumber距離允許下次執行的剩餘毫秒數

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

觸發器

flow.trigger

工作流程入口點 - 手動、webhook、排程或事件

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__string路由事件(triggered/error)
trigger_dataobject觸發資料
trigger_typestring觸發類型
triggered_atstring觸發時間

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.