Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
バッチ処理設定可能なサイズでアイテムをバッチ処理
分岐条件式に基づく分岐
ブレークポイント人間の承認または入力のためにワークフロー実行を一時停止
サーキットブレーカー連鎖的な障害を防ぐためのサーキットブレーカーパターン
コンテナ複雑なワークフローを整理するための埋め込みサブフローコンテナ
デバウンス急速な繰り返し呼び出しを防ぐためのデバウンス実行
終了明示的なワークフロー終了ノード
エラーハンドラー上流ノードからのエラーをキャッチして処理
エラーワークフロートリガーエラーワークフローのエントリーポイント - 別のワークフローが失敗したときにトリガーされる
For Each ループリストを反復処理し、各項目に対してステップを実行
フォーク実行を並列分岐に分割
ジャンプ別のステップへ無条件ジャンプ
ワークフローを呼び出す外部ワークフローファイルを実行する
結合並列分岐の完了を待機
ループ出力ポートルーティングを使用してステップをN回繰り返す
マージ複数の入力を単一の出力にマージ
並行異なる戦略で複数のタスクを並行して実行
レート制限トークンバケットまたはスライディングウィンドウを使用したレート制限実行
再試行失敗した操作を再試行し、バックオフを設定可能
開始明示的なワークフロー開始ノード
サブフロー外部ワークフローを参照して実行
スイッチ値のマッチングに基づく多路分岐
スロットル最小間隔で実行速度を制限
トリガーワークフローエントリーポイント - 手動、Webhook、スケジュール、またはイベント

Modules

バッチ処理

flow.batch

設定可能なサイズでアイテムをバッチ処理

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10バッチごとのアイテム数
delay_msnumberNo0バッチ間で待機するミリ秒(レート制限用)
continue_on_errorbooleanNoFalse1つ失敗しても残りのバッチを処理し続ける
parallel_batchesnumberNo11つ失敗しても残りのバッチを処理し続ける

Output:

FieldTypeDescription
__event__string並列で処理するバッチ数(1は順次)
batcharrayルーティング用イベント(バッチ/完了/エラー)
batch_indexnumberルーティング用イベント(バッチ/完了/エラー)
total_batchesnumber現在のバッチアイテム
total_itemsnumber現在のバッチインデックス(0から始まる)
is_last_batchbooleanバッチの総数
progressobjectアイテムの総数

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

分岐

flow.branch

条件式に基づく分岐

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringルーティングイベント (true/false/error)
outputsobjectポート別出力値
resultboolean分岐結果
conditionstring条件値
resolved_conditionstring条件評価結果

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

ブレークポイント

flow.breakpoint

人間の承認または入力のためにワークフロー実行を一時停止

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringルーティングイベント (approved/rejected/timeout)
breakpoint_idstringブレークポイントID
statusstringステータス
approved_byarray承認者
rejected_byarray却下者
custom_inputsobjectカスタム入力値
commentsarrayレビューコメント
resolved_atstring解決時刻
wait_duration_msinteger待機時間(ミリ秒)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

サーキットブレーカー

flow.circuit_breaker

連鎖的な障害を防ぐためのサーキットブレーカーパターン

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5サーキットを開く前の失敗回数
reset_timeout_msnumberNo60000サーキットが半開に移行するまでの時間(ミリ秒)
half_open_maxnumberNo1半開状態で許可される最大リクエスト数

Output:

FieldTypeDescription
__event__stringルーティング用イベント(許可/拒否/半開)
statestringサーキットの状態(閉/開/半開)
failure_countnumber連続失敗の回数
last_failure_time_msnumber最後の失敗のタイムスタンプ(ミリ秒)
time_until_half_open_msnumberサーキットが半開になるまでのミリ秒

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

コンテナ

flow.container

複雑なワークフローを整理するための埋め込みサブフローコンテナ

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringルーティングイベント (success/error)
outputsobjectポート別出力値
subflow_resultobjectサブフロー結果
exported_variablesobjectエクスポートされた変数
node_countintegerノード数
execution_time_msnumber実行時間(ミリ秒)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

デバウンス

flow.debounce

急速な繰り返し呼び出しを防ぐためのデバウンス実行

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-最後の呼び出し後に実行するまでの待ち時間
leadingbooleanNoFalseリーディングエッジで実行(最初の呼び出しが即時トリガー)
trailingbooleanNoTrueトレーリングエッジで実行(遅延が終了した後)

Output:

FieldTypeDescription
__event__stringルーティング用イベント(実行/デバウンス)
last_call_msnumber最後の呼び出しのタイムスタンプ(ミリ秒)
calls_debouncednumber最後の実行以来デバウンスされた呼び出しの回数
time_since_last_msnumber最後の呼び出しから経過した時間(ミリ秒)
edgestringどのエッジが実行をトリガーしたか(リーディング/トレーリング)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

終了

flow.end

明示的なワークフロー終了ノード

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringルーティングイベント (end)
ended_atstring終了時刻
workflow_resultobjectワークフロー結果

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

エラーハンドラー

flow.error_handle

上流ノードからのエラーをキャッチして処理

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueエラーに対して何をするか
include_tracebackbooleanNoTrue出力に完全なスタックトレースを含める
error_code_mappingobjectNo{}出力に完全なスタックトレースを含める
fallback_valueanyNo-エラーコードをカスタムアクションにマップ

Output:

FieldTypeDescription
__event__stringエラーが抑制されたときに使用する値
outputsobjectルーティング用イベント(処理済み/エスカレート)
error_infoobjectルーティング用イベント(処理済み/エスカレート)
action_takenstring取られたアクション

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

エラーワークフロートリガー

flow.error_workflow_trigger

エラーワークフローのエントリーポイント - 別のワークフローが失敗したときにトリガーされる

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringこのエラーワークフローの説明
error_contextobjectルーティング用イベント(トリガー済み)
triggered_atstringエラーワークフローがトリガーされたときのISOタイムスタンプ

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

For Each ループ

flow.foreach

リストを反復処理し、各項目に対してステップを実行

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-反復処理する項目リスト(${variable} 参照対応)
stepsarrayNo-各項目に対して実行するステップ
item_varstringNoitem現在の項目の変数名
index_varstringNoindex現在のインデックスの変数名
output_modestringNocollect結果収集モード

Output:

FieldTypeDescription
__event__stringルーティングイベント (iterate/done)
__set_contextobject設定されたコンテキスト
outputsobjectポート別出力値
iterationnumber現在の反復インデックス
statusstring操作ステータス
resultsarray収集された結果
countnumber項目総数

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

フォーク

flow.fork

実行を並列分岐に分割

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringルーティングイベント (fork/error)
input_dataany入力データ
branch_countinteger分岐数

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

ジャンプ

flow.goto

別のステップへ無条件ジャンプ

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringルーティングイベント (goto)
targetstringターゲットステップ
iterationnumber反復回数

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

ワークフローを呼び出す

flow.invoke

外部ワークフローファイルを実行する

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__string呼び出されたワークフローに渡すパラメータ
resultany最大実行時間(秒)
workflow_idstringルーティング用イベント(成功/エラー)
execution_time_msnumberワークフローの実行結果

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

結合

flow.join

並列分岐の完了を待機

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringルーティングイベント (joined/timeout/error)
joined_dataarray結合されたデータ
completed_countinteger完了した分岐数
strategystring結合戦略

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

ループ

flow.loop

出力ポートルーティングを使用してステップをN回繰り返す

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1繰り返し回数
targetstringNo-ターゲットステップ(非推奨)
stepsarrayNo-各反復で実行するステップ
index_varstringNoindex現在のインデックスの変数名

Output:

FieldTypeDescription
__event__stringルーティングイベント (iterate/done)
outputsobjectポート別出力値
iterationnumber現在の反復
statusstring操作ステータス
resultsarray収集された結果
countnumber反復総数

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

マージ

flow.merge

複数の入力を単一の出力にマージ

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringルーティングイベント (merged/error)
merged_dataanyマージされたデータ
input_countinteger入力数
strategystringマージ戦略

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

並行

flow.parallel

異なる戦略で複数のタスクを並行して実行

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-並行して実行するタスク定義の配列
modestringNoall並行して実行するタスク定義の配列
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrue最初の失敗で全タスクを停止(mode=all の場合のみ)
concurrency_limitnumberNo0最初の失敗で全タスクを停止(mode=all の場合のみ)

Output:

FieldTypeDescription
__event__string同時実行タスクの最大数(0は無制限)
resultsarrayルーティング用のイベント(完了/部分/エラー)
completed_countnumberルーティング用イベント(完了/部分/エラー)
failed_countnumberすべてのタスクの結果
total_countnumber正常に完了したタスクの数
modestring失敗したタスクの数
duration_msnumberタスクの総数

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

レート制限

flow.rate_limit

トークンバケットまたはスライディングウィンドウを使用したレート制限実行

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-ウィンドウごとに許可される最大リクエスト数
window_msnumberNo60000時間ウィンドウ(ミリ秒)
strategystringNotoken_bucketレート制限戦略(トークンバケットまたはスライディングウィンドウ)
queue_overflowstringNowaitキューが満杯のときの動作(ドロップまたはエラー)

Output:

FieldTypeDescription
__event__stringルーティング用イベント(許可/制限)
tokens_remainingnumberバケット内の残りトークン数
window_reset_msnumberウィンドウがリセットされるまでのミリ秒
requests_in_windownumber現在のウィンドウ内のリクエスト数
wait_msnumber次の許可されたリクエストまでの待ち時間(ミリ秒)

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

再試行

flow.retry

失敗した操作を再試行し、バックオフを設定可能

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3再試行の最大試行回数
initial_delay_msnumberNo1000最初の再試行までの初期遅延(ミリ秒)
backoff_multipliernumberNo2.0指数バックオフの倍率
max_delay_msnumberNo30000再試行間の最大遅延(ミリ秒)
retry_on_errorsarrayNo[]再試行するエラータイプ(空の場合はすべて再試行)

Output:

FieldTypeDescription
__event__stringルーティング用イベント(再試行/成功/失敗)
attemptnumber現在の試行回数
max_retriesnumber設定された最大再試行回数
delay_msnumber次の再試行までの遅延時間(ミリ秒)
total_elapsed_msnumber経過した合計時間(ミリ秒)
last_errorobject最後のエラーメッセージ

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

開始

flow.start

明示的なワークフロー開始ノード

Output:

FieldTypeDescription
__event__stringルーティングイベント (start)
started_atstring開始時刻
workflow_idstringワークフローID

Example: Example

yaml

サブフロー

flow.subflow

外部ワークフローを参照して実行

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringルーティングイベント (success/error)
resultany実行結果
execution_idstring実行ID
workflow_refstringワークフロー参照

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

スイッチ

flow.switch

値のマッチングに基づく多路分岐

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringルーティングイベント (case:value または default)
outputsobjectポート別出力値
matched_casestringマッチしたケース
valueanyマッチした値

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

スロットル

flow.throttle

最小間隔で実行速度を制限

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-実行間の最小時間(ミリ秒)
leadingbooleanNoTrueリーディングエッジで実行(最初の呼び出しは即時通過)

Output:

FieldTypeDescription
__event__stringルーティング用イベント(実行済み/スロットル済み)
last_execution_msnumber最後に許可された実行のタイムスタンプ
calls_throttlednumber最後の実行以降にスロットルされた呼び出し回数
time_since_last_msnumber最後の実行から経過した時間(ミリ秒)
remaining_msnumber次の実行が許可されるまでの残り時間(ミリ秒)

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

トリガー

flow.trigger

ワークフローエントリーポイント - 手動、Webhook、スケジュール、またはイベント

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringルーティングイベント (triggered/error)
trigger_dataobjectトリガーデータ
trigger_typestringトリガータイプ
triggered_atstringトリガー時刻

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.