Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Proses BatchProses item dalam batch dengan ukuran yang dapat dikonfigurasi
CabangPercabangan bersyarat berdasarkan evaluasi ekspresi
BreakpointJeda eksekusi workflow untuk persetujuan atau input manusia
Pemutus SirkuitPola pemutus sirkuit untuk mencegah kegagalan berantai
ContainerContainer subflow tertanam untuk mengorganisir workflow kompleks
DebounceMenunda eksekusi untuk mencegah panggilan berulang cepat
AkhirNode akhir workflow eksplisit
Penanganan ErrorMenangkap dan menangani error dari node hulu
Pemicu Alur Kerja ErrorTitik masuk untuk alur kerja error - dipicu saat alur kerja lain gagal
Untuk SetiapIterasi daftar dan eksekusi langkah untuk setiap item
ForkPisahkan eksekusi ke cabang paralel
Pergi KeLompat tanpa syarat ke langkah lain
Panggil Alur KerjaJalankan file alur kerja eksternal
GabungTunggu cabang paralel selesai
LoopUlangi langkah N kali menggunakan routing port output
GabungGabungkan beberapa input menjadi satu output
ParalelEksekusi beberapa tugas secara paralel dengan strategi berbeda
Batas LajuBatas laju eksekusi menggunakan token bucket atau jendela geser
Coba UlangCoba ulang operasi yang gagal dengan jeda yang dapat diatur
MulaiNode awal workflow eksplisit
SubflowReferensi dan eksekusi workflow eksternal
SaklarPercabangan multi-arah berdasarkan pencocokan nilai
BatasiBatasi laju eksekusi dengan interval minimum
PemicuTitik masuk workflow - manual, webhook, jadwal, atau event

Modules

Proses Batch

flow.batch

Proses item dalam batch dengan ukuran yang dapat dikonfigurasi

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Jumlah item per batch
delay_msnumberNo0Milidetik untuk menunggu antar batch (untuk pembatasan laju)
continue_on_errorbooleanNoFalseLanjutkan memproses batch yang tersisa jika ada yang gagal
parallel_batchesnumberNo1Lanjutkan memproses batch yang tersisa jika ada yang gagal

Output:

FieldTypeDescription
__event__stringJumlah batch yang diproses secara paralel (1 untuk berurutan)
batcharrayEvent untuk pengaturan rute (batch/selesai/error)
batch_indexnumberEvent untuk pengaturan rute (batch/selesai/error)
total_batchesnumberItem batch saat ini
total_itemsnumberIndeks batch saat ini (berbasis 0)
is_last_batchbooleanJumlah total batch
progressobjectJumlah total item

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Cabang

flow.branch

Percabangan bersyarat berdasarkan evaluasi ekspresi

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvent routing (true/false/error)
outputsobjectNilai output berdasarkan port
resultbooleanHasil cabang
conditionstringNilai kondisi
resolved_conditionstringHasil evaluasi kondisi

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Breakpoint

flow.breakpoint

Jeda eksekusi workflow untuk persetujuan atau input manusia

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvent routing (approved/rejected/timeout)
breakpoint_idstringID Breakpoint
statusstringStatus
approved_byarrayDisetujui oleh
rejected_byarrayDitolak oleh
custom_inputsobjectNilai input kustom
commentsarrayKomentar tinjauan
resolved_atstringWaktu resolusi
wait_duration_msintegerDurasi tunggu (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Pemutus Sirkuit

flow.circuit_breaker

Pola pemutus sirkuit untuk mencegah kegagalan berantai

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Jumlah kegagalan sebelum membuka sirkuit
reset_timeout_msnumberNo60000Waktu dalam milidetik sebelum sirkuit beralih ke setengah terbuka
half_open_maxnumberNo1Permintaan maksimum yang diizinkan dalam status setengah terbuka

Output:

FieldTypeDescription
__event__stringAcara untuk pengalihan (diizinkan/ditolak/setengah terbuka)
statestringStatus sirkuit (tertutup/terbuka/setengah terbuka)
failure_countnumberJumlah kegagalan berturut-turut
last_failure_time_msnumberStempel waktu kegagalan terakhir dalam milidetik
time_until_half_open_msnumberMilidetik hingga sirkuit beralih ke setengah terbuka

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Container

flow.container

Container subflow tertanam untuk mengorganisir workflow kompleks

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvent routing (success/error)
outputsobjectNilai output berdasarkan port
subflow_resultobjectHasil subflow
exported_variablesobjectVariabel yang diekspor
node_countintegerJumlah node
execution_time_msnumberWaktu eksekusi (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Debounce

flow.debounce

Menunda eksekusi untuk mencegah panggilan berulang cepat

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Waktu tunggu setelah panggilan terakhir sebelum eksekusi
leadingbooleanNoFalseEksekusi pada tepi awal (panggilan pertama memicu segera)
trailingbooleanNoTrueEksekusi pada tepi akhir (setelah penundaan berakhir)

Output:

FieldTypeDescription
__event__stringAcara untuk pengalihan (dieksekusi/didebounce)
last_call_msnumberStempel waktu panggilan terakhir dalam milidetik
calls_debouncednumberJumlah panggilan yang didebounce sejak eksekusi terakhir
time_since_last_msnumberWaktu yang berlalu sejak panggilan terakhir dalam milidetik
edgestringTepi mana yang memicu eksekusi (awal/akhir)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

Akhir

flow.end

Node akhir workflow eksplisit

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvent routing (end)
ended_atstringWaktu akhir
workflow_resultobjectHasil workflow

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Penanganan Error

flow.error_handle

Menangkap dan menangani error dari node hulu

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueApa yang harus dilakukan dengan error
include_tracebackbooleanNoTrueSertakan jejak tumpukan lengkap dalam output
error_code_mappingobjectNo{}Sertakan jejak tumpukan lengkap dalam output
fallback_valueanyNo-Peta kode error ke tindakan khusus

Output:

FieldTypeDescription
__event__stringNilai yang digunakan saat error ditekan
outputsobjectEvent untuk pengaturan rute (ditangani/eskalasi)
error_infoobjectEvent untuk pengaturan rute (ditangani/eskalasi)
action_takenstringTindakan yang diambil

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Pemicu Alur Kerja Error

flow.error_workflow_trigger

Titik masuk untuk alur kerja error - dipicu saat alur kerja lain gagal

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDeskripsi alur kerja error ini
error_contextobjectEvent untuk pengaturan rute (dipicu)
triggered_atstringTimestamp ISO saat alur kerja error dipicu

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

Untuk Setiap

flow.foreach

Iterasi daftar dan eksekusi langkah untuk setiap item

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-Daftar item untuk diiterasi (mendukung referensi ${variabel})
stepsarrayNo-Langkah untuk dieksekusi untuk setiap item
item_varstringNoitemNama variabel untuk item saat ini
index_varstringNoindexNama variabel untuk indeks saat ini
output_modestringNocollectMode pengumpulan hasil

Output:

FieldTypeDescription
__event__stringEvent routing (iterate/done)
__set_contextobjectAtur konteks
outputsobjectNilai output berdasarkan port
iterationnumberIndeks iterasi saat ini
statusstringStatus operasi
resultsarrayHasil yang dikumpulkan
countnumberTotal jumlah item

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Fork

flow.fork

Pisahkan eksekusi ke cabang paralel

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvent routing (fork/error)
input_dataanyData input
branch_countintegerJumlah cabang

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Pergi Ke

flow.goto

Lompat tanpa syarat ke langkah lain

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvent routing (goto)
targetstringLangkah target
iterationnumberJumlah iterasi

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Panggil Alur Kerja

flow.invoke

Jalankan file alur kerja eksternal

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParameter untuk diteruskan ke alur kerja yang dipanggil
resultanyWaktu eksekusi maksimum dalam detik
workflow_idstringEvent untuk routing (berhasil/gagal)
execution_time_msnumberHasil eksekusi alur kerja

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Gabung

flow.join

Tunggu cabang paralel selesai

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvent routing (joined/timeout/error)
joined_dataarrayData yang digabungkan
completed_countintegerJumlah cabang selesai
strategystringStrategi join

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Loop

flow.loop

Ulangi langkah N kali menggunakan routing port output

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Jumlah pengulangan
targetstringNo-Langkah target (deprecated)
stepsarrayNo-Langkah untuk dieksekusi untuk setiap iterasi
index_varstringNoindexNama variabel untuk indeks saat ini

Output:

FieldTypeDescription
__event__stringEvent routing (iterate/done)
outputsobjectNilai output berdasarkan port
iterationnumberIterasi saat ini
statusstringStatus operasi
resultsarrayHasil yang dikumpulkan
countnumberTotal iterasi

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Gabung

flow.merge

Gabungkan beberapa input menjadi satu output

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvent routing (merged/error)
merged_dataanyData yang digabungkan
input_countintegerJumlah input
strategystringStrategi merge

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Paralel

flow.parallel

Eksekusi beberapa tugas secara paralel dengan strategi berbeda

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Array definisi tugas untuk dieksekusi secara paralel
modestringNoallArray definisi tugas untuk dieksekusi secara paralel
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueHentikan semua tugas pada kegagalan pertama (hanya untuk mode=all)
concurrency_limitnumberNo0Hentikan semua tugas pada kegagalan pertama (hanya untuk mode=all)

Output:

FieldTypeDescription
__event__stringJumlah maksimum tugas bersamaan (0 untuk tidak terbatas)
resultsarrayEvent untuk pengalihan (selesai/parsial/error)
completed_countnumberEvent untuk pengaturan rute (selesai/parsial/error)
failed_countnumberHasil dari semua tugas
total_countnumberJumlah tugas yang berhasil diselesaikan
modestringJumlah tugas yang gagal
duration_msnumberJumlah total tugas

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Batas Laju

flow.rate_limit

Batas laju eksekusi menggunakan token bucket atau jendela geser

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Jumlah maksimum permintaan yang diizinkan per jendela
window_msnumberNo60000Jendela waktu dalam milidetik
strategystringNotoken_bucketStrategi pembatasan laju (token_bucket atau sliding_window)
queue_overflowstringNowaitPerilaku saat antrian penuh (buang atau kesalahan)

Output:

FieldTypeDescription
__event__stringAcara untuk pengalihan (diizinkan/dibatasi)
tokens_remainingnumberToken yang tersisa dalam bucket
window_reset_msnumberMilidetik hingga jendela direset
requests_in_windownumberJumlah permintaan dalam jendela saat ini
wait_msnumberMilidetik untuk menunggu sebelum permintaan berikutnya diizinkan

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Coba Ulang

flow.retry

Coba ulang operasi yang gagal dengan jeda yang dapat diatur

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Jumlah maksimum percobaan ulang
initial_delay_msnumberNo1000Jeda awal sebelum percobaan ulang pertama dalam milidetik
backoff_multipliernumberNo2.0Pengali untuk jeda eksponensial
max_delay_msnumberNo30000Jeda maksimum antara percobaan ulang dalam milidetik
retry_on_errorsarrayNo[]Jenis kesalahan untuk dicoba ulang (kosong berarti coba ulang semua)

Output:

FieldTypeDescription
__event__stringEvent untuk routing (coba ulang/sukses/gagal)
attemptnumberNomor percobaan saat ini
max_retriesnumberJumlah maksimum percobaan ulang yang diatur
delay_msnumberJeda sebelum coba ulang berikutnya dalam milidetik
total_elapsed_msnumberTotal waktu yang berlalu dalam milidetik
last_errorobjectPesan kesalahan terakhir

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Mulai

flow.start

Node awal workflow eksplisit

Output:

FieldTypeDescription
__event__stringEvent routing (start)
started_atstringWaktu mulai
workflow_idstringID Workflow

Example: Example

yaml

Subflow

flow.subflow

Referensi dan eksekusi workflow eksternal

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvent routing (success/error)
resultanyHasil eksekusi
execution_idstringID Eksekusi
workflow_refstringReferensi workflow

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Saklar

flow.switch

Percabangan multi-arah berdasarkan pencocokan nilai

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvent routing (case:value atau default)
outputsobjectNilai output berdasarkan port
matched_casestringCase yang cocok
valueanyNilai yang cocok

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Batasi

flow.throttle

Batasi laju eksekusi dengan interval minimum

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Waktu minimum antara eksekusi dalam milidetik
leadingbooleanNoTrueEksekusi pada tepi depan (panggilan pertama langsung lolos)

Output:

FieldTypeDescription
__event__stringEvent untuk routing (dieksekusi/dibatasi)
last_execution_msnumberCap waktu eksekusi terakhir yang diizinkan
calls_throttlednumberJumlah panggilan yang dibatasi sejak eksekusi terakhir
time_since_last_msnumberWaktu yang berlalu sejak eksekusi terakhir dalam milidetik
remaining_msnumberMilidetik tersisa hingga eksekusi berikutnya diizinkan

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Pemicu

flow.trigger

Titik masuk workflow - manual, webhook, jadwal, atau event

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvent routing (triggered/error)
trigger_dataobjectData trigger
trigger_typestringJenis trigger
triggered_atstringWaktu trigger

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.