Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
배치 처리설정 가능한 크기로 항목을 배치 처리합니다
분기표현식 평가에 기반한 조건 분기
중단점인간 승인 또는 입력을 위해 워크플로 실행 일시 정지
회로 차단기연쇄 실패를 방지하기 위한 회로 차단기 패턴
컨테이너복잡한 워크플로 구성을 위한 내장 서브플로 컨테이너
디바운스빠른 반복 호출을 방지하기 위한 디바운스 실행
종료명시적 워크플로 종료 노드
오류 처리기상위 노드에서 발생한 오류를 잡아 처리합니다
오류 워크플로우 트리거다른 워크플로우가 실패할 때 트리거되는 오류 워크플로우의 진입점
각각에 대해목록을 반복하며 각 항목에 대해 단계 실행
포크실행을 병렬 분기로 분할
이동다른 단계로 무조건 점프
워크플로 호출외부 워크플로 파일 실행
조인병렬 분기 완료 대기
루프출력 포트 라우팅을 사용하여 N번 단계 반복
병합여러 입력을 단일 출력으로 병합
병렬다양한 전략으로 여러 작업을 병렬로 실행합니다
속도 제한토큰 버킷 또는 슬라이딩 윈도우를 사용한 속도 제한 실행
재시도구성 가능한 백오프로 실패한 작업 재시도
시작명시적 워크플로 시작 노드
서브플로외부 워크플로 참조 및 실행
스위치값 매칭에 기반한 다중 분기
속도 제한최소 간격으로 실행 속도 제한
트리거워크플로 진입점 - 수동, 웹훅, 스케줄 또는 이벤트

Modules

배치 처리

flow.batch

설정 가능한 크기로 항목을 배치 처리합니다

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10배치당 항목 수
delay_msnumberNo0배치 간 대기 시간 (속도 제한용)
continue_on_errorbooleanNoFalse하나가 실패해도 남은 배치를 계속 처리
parallel_batchesnumberNo1하나가 실패해도 남은 배치를 계속 처리

Output:

FieldTypeDescription
__event__string병렬로 처리할 배치 수 (순차적 처리는 1)
batcharray라우팅 이벤트 (배치/완료/오류)
batch_indexnumber라우팅 이벤트 (배치/완료/오류)
total_batchesnumber현재 배치 항목
total_itemsnumber현재 배치 인덱스 (0부터 시작)
is_last_batchboolean총 배치 수
progressobject총 항목 수

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

분기

flow.branch

표현식 평가에 기반한 조건 분기

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (true/false/error)
outputsobject포트별 출력 값
resultboolean분기 결과
conditionstring조건 값
resolved_conditionstring조건 평가 결과

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

중단점

flow.breakpoint

인간 승인 또는 입력을 위해 워크플로 실행 일시 정지

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (approved/rejected/timeout)
breakpoint_idstring중단점 ID
statusstring상태
approved_byarray승인자
rejected_byarray거부자
custom_inputsobject사용자 정의 입력 값
commentsarray검토 코멘트
resolved_atstring해결 시간
wait_duration_msinteger대기 시간 (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

회로 차단기

flow.circuit_breaker

연쇄 실패를 방지하기 위한 회로 차단기 패턴

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5회로가 열리기 전 실패 횟수
reset_timeout_msnumberNo60000회로가 반개방으로 전환되기 전의 시간 (밀리초)
half_open_maxnumberNo1반개방 상태에서 허용되는 최대 요청 수

Output:

FieldTypeDescription
__event__string라우팅을 위한 이벤트 (허용/거부/반개방)
statestring회로 상태 (닫힘/열림/반개방)
failure_countnumber연속 실패 횟수
last_failure_time_msnumber마지막 실패의 타임스탬프 (밀리초)
time_until_half_open_msnumber회로가 반개방으로 전환될 때까지의 밀리초

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

컨테이너

flow.container

복잡한 워크플로 구성을 위한 내장 서브플로 컨테이너

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (success/error)
outputsobject포트별 출력 값
subflow_resultobject서브플로 결과
exported_variablesobject내보낸 변수
node_countinteger노드 수
execution_time_msnumber실행 시간 (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

디바운스

flow.debounce

빠른 반복 호출을 방지하기 위한 디바운스 실행

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-마지막 호출 후 실행까지의 대기 시간
leadingbooleanNoFalse선행 엣지에서 실행 (첫 호출 즉시 트리거)
trailingbooleanNoTrue후행 엣지에서 실행 (지연 만료 후)

Output:

FieldTypeDescription
__event__string라우팅을 위한 이벤트 (실행됨/디바운스됨)
last_call_msnumber마지막 호출의 타임스탬프 (밀리초)
calls_debouncednumber마지막 실행 이후 디바운스된 호출 수
time_since_last_msnumber마지막 호출 이후 경과 시간 (밀리초)
edgestring어느 엣지가 실행을 트리거했는지 (선행/후행)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

종료

flow.end

명시적 워크플로 종료 노드

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (end)
ended_atstring종료 시간
workflow_resultobject워크플로 결과

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

오류 처리기

flow.error_handle

상위 노드에서 발생한 오류를 잡아 처리합니다

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continue오류에 대해 할 작업
include_tracebackbooleanNoTrue출력에 전체 스택 추적 포함
error_code_mappingobjectNo{}출력에 전체 스택 추적 포함
fallback_valueanyNo-오류 코드를 사용자 정의 작업에 매핑

Output:

FieldTypeDescription
__event__string오류가 억제될 때 사용할 값
outputsobject라우팅 이벤트 (처리됨/에스컬레이트)
error_infoobject라우팅 이벤트 (처리됨/에스컬레이트)
action_takenstring취해진 조치

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

오류 워크플로우 트리거

flow.error_workflow_trigger

다른 워크플로우가 실패할 때 트리거되는 오류 워크플로우의 진입점

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__string이 오류 워크플로우의 설명
error_contextobject라우팅 이벤트 (트리거됨)
triggered_atstring오류 워크플로우가 트리거된 ISO 타임스탬프

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

각각에 대해

flow.foreach

목록을 반복하며 각 항목에 대해 단계 실행

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-반복할 항목 목록 (${variable} 참조 지원)
stepsarrayNo-각 항목에 대해 실행할 단계
item_varstringNoitem현재 항목의 변수명
index_varstringNoindex현재 인덱스의 변수명
output_modestringNocollect결과 수집 모드

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (iterate/done)
__set_contextobject컨텍스트 설정
outputsobject포트별 출력 값
iterationnumber현재 반복 인덱스
statusstring작업 상태
resultsarray수집된 결과
countnumber총 항목 수

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

포크

flow.fork

실행을 병렬 분기로 분할

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (fork/error)
input_dataany입력 데이터
branch_countinteger분기 수

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

이동

flow.goto

다른 단계로 무조건 점프

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (goto)
targetstring대상 단계
iterationnumber반복 횟수

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

워크플로 호출

flow.invoke

외부 워크플로 파일 실행

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__string호출된 워크플로에 전달할 매개변수
resultany최대 실행 시간(초)
workflow_idstring라우팅 이벤트(성공/오류)
execution_time_msnumber워크플로 실행 결과

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

조인

flow.join

병렬 분기 완료 대기

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (joined/timeout/error)
joined_dataarray조인된 데이터
completed_countinteger완료된 분기 수
strategystring조인 전략

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

루프

flow.loop

출력 포트 라우팅을 사용하여 N번 단계 반복

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1반복 횟수
targetstringNo-대상 단계 (더 이상 사용되지 않음)
stepsarrayNo-각 반복에 대해 실행할 단계
index_varstringNoindex현재 인덱스의 변수명

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (iterate/done)
outputsobject포트별 출력 값
iterationnumber현재 반복
statusstring작업 상태
resultsarray수집된 결과
countnumber총 반복 횟수

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

병합

flow.merge

여러 입력을 단일 출력으로 병합

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (merged/error)
merged_dataany병합된 데이터
input_countinteger입력 수
strategystring병합 전략

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

병렬

flow.parallel

다양한 전략으로 여러 작업을 병렬로 실행합니다

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-병렬로 실행할 작업 정의 배열
modestringNoall병렬로 실행할 작업 정의 배열
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrue첫 번째 실패 시 모든 작업 중지 (mode=all에만 해당)
concurrency_limitnumberNo0첫 번째 실패 시 모든 작업 중지 (mode=all에만 해당)

Output:

FieldTypeDescription
__event__string최대 동시 작업 수 (무제한은 0)
resultsarray라우팅 이벤트 (완료/부분/오류)
completed_countnumber라우팅 이벤트 (완료/부분/오류)
failed_countnumber모든 작업의 결과
total_countnumber성공적으로 완료된 작업 수
modestring실패한 작업 수
duration_msnumber총 작업 수

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

속도 제한

flow.rate_limit

토큰 버킷 또는 슬라이딩 윈도우를 사용한 속도 제한 실행

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-윈도우당 허용되는 최대 요청 수
window_msnumberNo60000시간 윈도우 (밀리초)
strategystringNotoken_bucket속도 제한 전략 (토큰 버킷 또는 슬라이딩 윈도우)
queue_overflowstringNowait큐가 가득 찼을 때의 동작 (드롭 또는 오류)

Output:

FieldTypeDescription
__event__string라우팅을 위한 이벤트 (허용/제한)
tokens_remainingnumber버킷에 남아있는 토큰 수
window_reset_msnumber윈도우가 재설정될 때까지의 밀리초
requests_in_windownumber현재 윈도우 내 요청 수
wait_msnumber다음 허용 요청까지 대기해야 하는 밀리초

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

재시도

flow.retry

구성 가능한 백오프로 실패한 작업 재시도

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3최대 재시도 시도 횟수
initial_delay_msnumberNo1000첫 번째 재시도 전 초기 지연 시간 (밀리초)
backoff_multipliernumberNo2.0지수 백오프를 위한 배수
max_delay_msnumberNo30000재시도 간 최대 지연 시간 (밀리초)
retry_on_errorsarrayNo[]재시도할 오류 유형 (비어 있으면 모두 재시도)

Output:

FieldTypeDescription
__event__string라우팅을 위한 이벤트 (재시도/성공/실패)
attemptnumber현재 시도 횟수
max_retriesnumber구성된 최대 재시도 횟수
delay_msnumber다음 재시도 전 지연 시간 (밀리초)
total_elapsed_msnumber총 경과 시간 (밀리초)
last_errorobject마지막 오류 메시지

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

시작

flow.start

명시적 워크플로 시작 노드

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (start)
started_atstring시작 시간
workflow_idstring워크플로 ID

Example: Example

yaml

서브플로

flow.subflow

외부 워크플로 참조 및 실행

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (success/error)
resultany실행 결과
execution_idstring실행 ID
workflow_refstring워크플로 참조

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

스위치

flow.switch

값 매칭에 기반한 다중 분기

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (case:value 또는 default)
outputsobject포트별 출력 값
matched_casestring일치한 케이스
valueany일치한 값

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

속도 제한

flow.throttle

최소 간격으로 실행 속도 제한

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-실행 간 최소 시간 (밀리초)
leadingbooleanNoTrue선행 엣지에서 실행 (첫 호출 즉시 통과)

Output:

FieldTypeDescription
__event__string라우팅을 위한 이벤트 (실행/제한)
last_execution_msnumber마지막 허용된 실행의 타임스탬프
calls_throttlednumber마지막 실행 이후 제한된 호출 수
time_since_last_msnumber마지막 실행 이후 경과 시간 (밀리초)
remaining_msnumber다음 실행이 허용되기까지 남은 밀리초

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

트리거

flow.trigger

워크플로 진입점 - 수동, 웹훅, 스케줄 또는 이벤트

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__string라우팅 이벤트 (triggered/error)
trigger_dataobject트리거 데이터
trigger_typestring트리거 유형
triggered_atstring트리거 시간

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.