Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Processo em LoteProcessar itens em lotes com tamanho configurável
RamificacaoRamificacao condicional baseada em avaliacao de expressao
Ponto de ParadaPausar execucao do workflow para aprovacao ou entrada humana
DisjuntorPadrão de disjuntor para prevenir falhas em cascata
ContainerContainer de subfluxo incorporado para organizar workflows complexos
DebounceDebounce para evitar chamadas repetidas rápidas
FimNo explicito de fim de workflow
Manipulador de ErroCaptura e lida com erros de nós anteriores
Gatilho de Fluxo de Trabalho de ErroPonto de entrada para fluxos de trabalho de erro - acionado quando outro fluxo de trabalho falha
Para CadaIterar sobre lista e executar passos para cada item
BifurcacaoDividir execucao em ramificacoes paralelas
Ir ParaSalto incondicional para outro passo
Invoke WorkflowExecute an external workflow file
JuncaoAguardar ramificacoes paralelas completarem
LoopRepetir passos N vezes usando roteamento de porta de saida
MesclarMesclar multiplas entradas em uma unica saida
ParaleloExecutar várias tarefas em paralelo com diferentes estratégias
Limite de TaxaLimitar taxa de execução usando token bucket ou janela deslizante
Tentar NovamenteTentar novamente operações falhas com recuo configurável
InicioNo explicito de inicio de workflow
SubfluxoReferenciar e executar workflow externo
SwitchRamificacao multipla baseada em correspondencia de valor
Controlar TaxaControlar a taxa de execução com intervalo mínimo
GatilhoPonto de entrada do workflow - manual, webhook, agendamento ou evento

Modules

Processo em Lote

flow.batch

Processar itens em lotes com tamanho configurável

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Número de itens por lote
delay_msnumberNo0Milissegundos para esperar entre lotes (para limitar a taxa)
continue_on_errorbooleanNoFalseContinuar processando os lotes restantes se um falhar
parallel_batchesnumberNo1Continuar processando os lotes restantes se um falhar

Output:

FieldTypeDescription
__event__stringNúmero de lotes para processar em paralelo (1 para sequencial)
batcharrayEvento para roteamento (lote/concluído/erro)
batch_indexnumberEvento para roteamento (lote/concluído/erro)
total_batchesnumberItens do lote atual
total_itemsnumberÍndice do lote atual (baseado em 0)
is_last_batchbooleanNúmero total de lotes
progressobjectNúmero total de itens

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Ramificacao

flow.branch

Ramificacao condicional baseada em avaliacao de expressao

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvento de roteamento (true/false/error)
outputsobjectValores de saida por porta
resultbooleanResultado da ramificacao
conditionstringValor da condicao
resolved_conditionstringResultado da avaliacao da condicao

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Ponto de Parada

flow.breakpoint

Pausar execucao do workflow para aprovacao ou entrada humana

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento de roteamento (approved/rejected/timeout)
breakpoint_idstringID do ponto de parada
statusstringStatus
approved_byarrayAprovado por
rejected_byarrayRejeitado por
custom_inputsobjectValores de entrada personalizados
commentsarrayComentarios da revisao
resolved_atstringHora de resolucao
wait_duration_msintegerDuracao de espera (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Disjuntor

flow.circuit_breaker

Padrão de disjuntor para prevenir falhas em cascata

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Número de falhas antes de abrir o circuito
reset_timeout_msnumberNo60000Tempo em milissegundos antes do circuito transicionar para meio-aberto
half_open_maxnumberNo1Máximo de requisições permitidas no estado meio-aberto

Output:

FieldTypeDescription
__event__stringEvento para roteamento (permitido/rejeitado/meio-aberto)
statestringEstado do circuito (fechado/aberto/meio-aberto)
failure_countnumberNúmero de falhas consecutivas
last_failure_time_msnumberTimestamp da última falha em milissegundos
time_until_half_open_msnumberMilissegundos até o circuito transicionar para meio-aberto

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Container

flow.container

Container de subfluxo incorporado para organizar workflows complexos

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvento de roteamento (success/error)
outputsobjectValores de saida por porta
subflow_resultobjectResultado do subfluxo
exported_variablesobjectVariaveis exportadas
node_countintegerContagem de nos
execution_time_msnumberTempo de execucao (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Debounce

flow.debounce

Debounce para evitar chamadas repetidas rápidas

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Tempo de espera após a última chamada antes de executar
leadingbooleanNoFalseExecutar na borda inicial (primeira chamada aciona imediatamente)
trailingbooleanNoTrueExecutar na borda final (após o atraso expirar)

Output:

FieldTypeDescription
__event__stringEvento para roteamento (executado/debounced)
last_call_msnumberTimestamp da última chamada em milissegundos
calls_debouncednumberNúmero de chamadas debounced desde a última execução
time_since_last_msnumberTempo decorrido desde a última chamada em milissegundos
edgestringQual borda acionou a execução (inicial/final)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

Fim

flow.end

No explicito de fim de workflow

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento de roteamento (end)
ended_atstringHora de termino
workflow_resultobjectResultado do workflow

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Manipulador de Erro

flow.error_handle

Captura e lida com erros de nós anteriores

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueO que fazer com o erro
include_tracebackbooleanNoTrueIncluir rastreamento completo na saída
error_code_mappingobjectNo{}Incluir rastreamento completo na saída
fallback_valueanyNo-Mapear códigos de erro para ações personalizadas

Output:

FieldTypeDescription
__event__stringValor a ser usado quando o erro é suprimido
outputsobjectEvento para roteamento (manipulado/escalar)
error_infoobjectEvento para roteamento (manipulado/escalar)
action_takenstringAção tomada

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Gatilho de Fluxo de Trabalho de Erro

flow.error_workflow_trigger

Ponto de entrada para fluxos de trabalho de erro - acionado quando outro fluxo de trabalho falha

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDescrição deste fluxo de trabalho de erro
error_contextobjectEvento para roteamento (acionado)
triggered_atstringTimestamp ISO quando o fluxo de trabalho de erro foi acionado

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

Para Cada

flow.foreach

Iterar sobre lista e executar passos para cada item

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-Lista de itens para iterar (suporta referencia ${variable})
stepsarrayNo-Passos para executar para cada item
item_varstringNoitemNome da variavel para item atual
index_varstringNoindexNome da variavel para indice atual
output_modestringNocollectModo de coleta de resultados

Output:

FieldTypeDescription
__event__stringEvento de roteamento (iterate/done)
__set_contextobjectDefinir contexto
outputsobjectValores de saida por porta
iterationnumberIndice de iteracao atual
statusstringStatus da operacao
resultsarrayResultados coletados
countnumberContagem total de itens

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Bifurcacao

flow.fork

Dividir execucao em ramificacoes paralelas

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvento de roteamento (fork/error)
input_dataanyDados de entrada
branch_countintegerContagem de ramificacoes

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Ir Para

flow.goto

Salto incondicional para outro passo

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvento de roteamento (goto)
targetstringPasso alvo
iterationnumberContagem de iteracoes

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Invoke Workflow

flow.invoke

Execute an external workflow file

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParameters to pass to the invoked workflow
resultanyMaximum execution time in seconds
workflow_idstringEvent for routing (success/error)
execution_time_msnumberWorkflow execution result

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Juncao

flow.join

Aguardar ramificacoes paralelas completarem

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvento de roteamento (joined/timeout/error)
joined_dataarrayDados unidos
completed_countintegerContagem de ramificacoes completadas
strategystringEstrategia de juncao

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Loop

flow.loop

Repetir passos N vezes usando roteamento de porta de saida

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Numero de repeticoes
targetstringNo-Passo alvo (obsoleto)
stepsarrayNo-Passos para executar para cada iteracao
index_varstringNoindexNome da variavel para indice atual

Output:

FieldTypeDescription
__event__stringEvento de roteamento (iterate/done)
outputsobjectValores de saida por porta
iterationnumberIteracao atual
statusstringStatus da operacao
resultsarrayResultados coletados
countnumberTotal de iteracoes

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Mesclar

flow.merge

Mesclar multiplas entradas em uma unica saida

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvento de roteamento (merged/error)
merged_dataanyDados mesclados
input_countintegerContagem de entradas
strategystringEstrategia de mesclagem

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Paralelo

flow.parallel

Executar várias tarefas em paralelo com diferentes estratégias

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Array de definições de tarefas para executar em paralelo
modestringNoallArray de definições de tarefas para executar em paralelo
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueParar todas as tarefas na primeira falha (somente para modo=all)
concurrency_limitnumberNo0Parar todas as tarefas na primeira falha (somente para modo=all)

Output:

FieldTypeDescription
__event__stringNúmero máximo de tarefas concorrentes (0 para ilimitado)
resultsarrayEvento para roteamento (concluído/parcial/erro)
completed_countnumberEvento para roteamento (concluído/parcial/erro)
failed_countnumberResultados de todas as tarefas
total_countnumberNúmero de tarefas concluídas com sucesso
modestringNúmero de tarefas falhas
duration_msnumberNúmero total de tarefas

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Limite de Taxa

flow.rate_limit

Limitar taxa de execução usando token bucket ou janela deslizante

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Número máximo de requisições permitidas por janela
window_msnumberNo60000Janela de tempo em milissegundos
strategystringNotoken_bucketEstratégia de limitação de taxa (token_bucket ou janela_deslizante)
queue_overflowstringNowaitComportamento quando a fila está cheia (descartar ou erro)

Output:

FieldTypeDescription
__event__stringEvento para roteamento (permitido/limitado)
tokens_remainingnumberTokens restantes no bucket
window_reset_msnumberMilissegundos até a janela reiniciar
requests_in_windownumberNúmero de requisições na janela atual
wait_msnumberMilissegundos para esperar antes da próxima requisição permitida

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Tentar Novamente

flow.retry

Tentar novamente operações falhas com recuo configurável

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Número máximo de tentativas
initial_delay_msnumberNo1000Atraso inicial antes da primeira tentativa em milissegundos
backoff_multipliernumberNo2.0Multiplicador para recuo exponencial
max_delay_msnumberNo30000Atraso máximo entre tentativas em milissegundos
retry_on_errorsarrayNo[]Tipos de erro para tentar novamente (vazio significa tentar todos)

Output:

FieldTypeDescription
__event__stringEvento para roteamento (tentar novamente/sucesso/falha)
attemptnumberNúmero atual da tentativa
max_retriesnumberNúmero máximo de tentativas configuradas
delay_msnumberAtraso antes da próxima tentativa em milissegundos
total_elapsed_msnumberTempo total decorrido em milissegundos
last_errorobjectÚltima mensagem de erro

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Inicio

flow.start

No explicito de inicio de workflow

Output:

FieldTypeDescription
__event__stringEvento de roteamento (start)
started_atstringHora de inicio
workflow_idstringID do workflow

Example: Example

yaml

Subfluxo

flow.subflow

Referenciar e executar workflow externo

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvento de roteamento (success/error)
resultanyResultado da execucao
execution_idstringID de execucao
workflow_refstringReferencia do workflow

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Switch

flow.switch

Ramificacao multipla baseada em correspondencia de valor

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvento de roteamento (case:value ou default)
outputsobjectValores de saida por porta
matched_casestringCaso correspondido
valueanyValor correspondido

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Controlar Taxa

flow.throttle

Controlar a taxa de execução com intervalo mínimo

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Tempo mínimo entre execuções em milissegundos
leadingbooleanNoTrueExecutar na borda inicial (primeira chamada passa imediatamente)

Output:

FieldTypeDescription
__event__stringEvento para roteamento (executado/controlado)
last_execution_msnumberTimestamp da última execução permitida
calls_throttlednumberNúmero de chamadas controladas desde a última execução
time_since_last_msnumberTempo decorrido desde a última execução em milissegundos
remaining_msnumberMilissegundos restantes até a próxima execução permitida

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Gatilho

flow.trigger

Ponto de entrada do workflow - manual, webhook, agendamento ou evento

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvento de roteamento (triggered/error)
trigger_dataobjectDados do gatilho
trigger_typestringTipo de gatilho
triggered_atstringHora de disparo

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.