Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Proceso por LotesProcesa elementos en lotes con tamaño configurable
RamaRamificacion condicional basada en evaluacion de expresion
Punto de interrupcionPausar ejecucion de flujo de trabajo para aprobacion humana o entrada
DisyuntorPatrón de disyuntor para prevenir fallos en cascada
ContenedorContenedor de subflujo embebido para organizar flujos de trabajo complejos
RetardoEjecutar con retardo para prevenir llamadas repetidas rápidas
FinNodo de fin de flujo de trabajo explicito
Manejador de ErroresCaptura y maneja errores de nodos anteriores
Disparador de Flujo de ErrorPunto de entrada para flujos de error - activado cuando otro flujo falla
Para cadaIterar sobre una lista y ejecutar pasos para cada elemento
BifurcarDividir ejecucion en ramas paralelas
Ir aSalto incondicional a otro paso
Invocar flujo de trabajoEjecutar un archivo de flujo de trabajo externo
UnirEsperar a que se completen las ramas paralelas
BucleRepetir pasos N veces usando enrutamiento de puerto de salida
CombinarCombinar multiples entradas en una sola salida
ParaleloEjecuta múltiples tareas en paralelo con diferentes estrategias
Límite de TasaLimitar la tasa de ejecución usando cubo de tokens o ventana deslizante
ReintentarReintentar operaciones fallidas con retroceso configurable
InicioNodo de inicio de flujo de trabajo explicito
SubflujoReferenciar y ejecutar un flujo de trabajo externo
CambiarRamificacion multiple basada en coincidencia de valores
RegularRegular la tasa de ejecución con un intervalo mínimo
DisparadorPunto de entrada de flujo de trabajo - manual, webhook, programado o evento

Modules

Proceso por Lotes

flow.batch

Procesa elementos en lotes con tamaño configurable

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Número de elementos por lote
delay_msnumberNo0Milisegundos de espera entre lotes (para limitar la tasa)
continue_on_errorbooleanNoFalseContinuar procesando los lotes restantes si uno falla
parallel_batchesnumberNo1Continuar procesando los lotes restantes si uno falla

Output:

FieldTypeDescription
__event__stringNúmero de lotes a procesar en paralelo (1 para secuencial)
batcharrayEvento para enrutamiento (lote/completado/error)
batch_indexnumberEvento para enrutamiento (lote/completado/error)
total_batchesnumberElementos del lote actual
total_itemsnumberÍndice del lote actual (basado en 0)
is_last_batchbooleanNúmero total de lotes
progressobjectNúmero total de elementos

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Rama

flow.branch

Ramificacion condicional basada en evaluacion de expresion

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (true/false/error)
outputsobjectValores de salida por puerto
resultbooleanResultado de rama
conditionstringValor de condicion
resolved_conditionstringResultado de evaluacion de condicion

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Punto de interrupcion

flow.breakpoint

Pausar ejecucion de flujo de trabajo para aprobacion humana o entrada

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (approved/rejected/timeout)
breakpoint_idstringID del punto de interrupcion
statusstringEstado
approved_byarrayAprobado por
rejected_byarrayRechazado por
custom_inputsobjectValores de entrada personalizados
commentsarrayComentarios de revision
resolved_atstringHora de resolucion
wait_duration_msintegerDuracion de espera (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Disyuntor

flow.circuit_breaker

Patrón de disyuntor para prevenir fallos en cascada

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Número de fallos antes de abrir el circuito
reset_timeout_msnumberNo60000Tiempo en milisegundos antes de que el circuito pase a medio abierto
half_open_maxnumberNo1Máximo de solicitudes permitidas en estado medio abierto

Output:

FieldTypeDescription
__event__stringEvento para enrutamiento (permitido/rechazado/medio abierto)
statestringEstado del circuito (cerrado/abierto/medio abierto)
failure_countnumberNúmero de fallos consecutivos
last_failure_time_msnumberMarca de tiempo del último fallo en milisegundos
time_until_half_open_msnumberMilisegundos hasta que el circuito pase a medio abierto

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Contenedor

flow.container

Contenedor de subflujo embebido para organizar flujos de trabajo complejos

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (success/error)
outputsobjectValores de salida por puerto
subflow_resultobjectResultado de subflujo
exported_variablesobjectVariables exportadas
node_countintegerCantidad de nodos
execution_time_msnumberTiempo de ejecucion (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Retardo

flow.debounce

Ejecutar con retardo para prevenir llamadas repetidas rápidas

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Tiempo de espera después de la última llamada antes de ejecutar
leadingbooleanNoFalseEjecutar en el borde inicial (la primera llamada activa inmediatamente)
trailingbooleanNoTrueEjecutar en el borde final (después de que expire el retardo)

Output:

FieldTypeDescription
__event__stringEvento para enrutamiento (ejecutado/con retardo)
last_call_msnumberMarca de tiempo de la última llamada en milisegundos
calls_debouncednumberNúmero de llamadas con retardo desde la última ejecución
time_since_last_msnumberTiempo transcurrido desde la última llamada en milisegundos
edgestringQué borde activó la ejecución (inicial/final)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

Fin

flow.end

Nodo de fin de flujo de trabajo explicito

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (end)
ended_atstringHora de fin
workflow_resultobjectResultado del flujo de trabajo

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Manejador de Errores

flow.error_handle

Captura y maneja errores de nodos anteriores

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueQué hacer con el error
include_tracebackbooleanNoTrueIncluir traza completa en la salida
error_code_mappingobjectNo{}Incluir traza completa en la salida
fallback_valueanyNo-Mapear códigos de error a acciones personalizadas

Output:

FieldTypeDescription
__event__stringValor a usar cuando el error es suprimido
outputsobjectEvento para enrutamiento (manejado/escalar)
error_infoobjectEvento para enrutamiento (manejado/escalar)
action_takenstringQué acción se tomó

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Disparador de Flujo de Error

flow.error_workflow_trigger

Punto de entrada para flujos de error - activado cuando otro flujo falla

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDescripción de este flujo de error
error_contextobjectEvento para enrutamiento (disparado)
triggered_atstringMarca de tiempo ISO cuando el flujo de error fue activado

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

Para cada

flow.foreach

Iterar sobre una lista y ejecutar pasos para cada elemento

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-Lista de elementos para iterar (soporta referencia ${variable})
stepsarrayNo-Pasos a ejecutar para cada elemento
item_varstringNoitemNombre de variable para elemento actual
index_varstringNoindexNombre de variable para indice actual
output_modestringNocollectModo de recoleccion de resultados

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (iterate/done)
__set_contextobjectEstablecer contexto
outputsobjectValores de salida por puerto
iterationnumberIndice de iteracion actual
statusstringEstado de la operacion
resultsarrayResultados recolectados
countnumberCantidad total de elementos

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Bifurcar

flow.fork

Dividir ejecucion en ramas paralelas

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (fork/error)
input_dataanyDatos de entrada
branch_countintegerCantidad de ramas

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Ir a

flow.goto

Salto incondicional a otro paso

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (goto)
targetstringPaso destino
iterationnumberConteo de iteraciones

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Invocar flujo de trabajo

flow.invoke

Ejecutar un archivo de flujo de trabajo externo

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParámetros para pasar al flujo de trabajo invocado
resultanyTiempo máximo de ejecución en segundos
workflow_idstringEvento para enrutamiento (éxito/error)
execution_time_msnumberResultado de ejecución del flujo de trabajo

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Unir

flow.join

Esperar a que se completen las ramas paralelas

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (joined/timeout/error)
joined_dataarrayDatos unidos
completed_countintegerCantidad de ramas completadas
strategystringEstrategia de union

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Bucle

flow.loop

Repetir pasos N veces usando enrutamiento de puerto de salida

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Numero de repeticiones
targetstringNo-Paso destino (obsoleto)
stepsarrayNo-Pasos a ejecutar para cada iteracion
index_varstringNoindexNombre de variable para indice actual

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (iterate/done)
outputsobjectValores de salida por puerto
iterationnumberIteracion actual
statusstringEstado de la operacion
resultsarrayResultados recolectados
countnumberTotal de iteraciones

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Combinar

flow.merge

Combinar multiples entradas en una sola salida

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (merged/error)
merged_dataanyDatos combinados
input_countintegerCantidad de entradas
strategystringEstrategia de combinacion

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Paralelo

flow.parallel

Ejecuta múltiples tareas en paralelo con diferentes estrategias

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Arreglo de definiciones de tareas para ejecutar en paralelo
modestringNoallArreglo de definiciones de tareas para ejecutar en paralelo
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueDetener todas las tareas en el primer fallo (solo para modo=all)
concurrency_limitnumberNo0Detener todas las tareas en el primer fallo (solo para modo=all)

Output:

FieldTypeDescription
__event__stringNúmero máximo de tareas concurrentes (0 para ilimitado)
resultsarrayEvento para enrutamiento (completado/parcial/error)
completed_countnumberEvento para enrutamiento (completado/parcial/error)
failed_countnumberResultados de todas las tareas
total_countnumberNúmero de tareas completadas con éxito
modestringNúmero de tareas fallidas
duration_msnumberNúmero total de tareas

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Límite de Tasa

flow.rate_limit

Limitar la tasa de ejecución usando cubo de tokens o ventana deslizante

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Número máximo de solicitudes permitidas por ventana
window_msnumberNo60000Ventana de tiempo en milisegundos
strategystringNotoken_bucketEstrategia de limitación de tasa (cubo de tokens o ventana deslizante)
queue_overflowstringNowaitComportamiento cuando la cola está llena (descartar o error)

Output:

FieldTypeDescription
__event__stringEvento para enrutamiento (permitido/limitado)
tokens_remainingnumberTokens restantes en el cubo
window_reset_msnumberMilisegundos hasta que la ventana se reinicie
requests_in_windownumberNúmero de solicitudes en la ventana actual
wait_msnumberMilisegundos de espera antes de la siguiente solicitud permitida

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Reintentar

flow.retry

Reintentar operaciones fallidas con retroceso configurable

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Número máximo de intentos de reintento
initial_delay_msnumberNo1000Retraso inicial antes del primer reintento en milisegundos
backoff_multipliernumberNo2.0Multiplicador para retroceso exponencial
max_delay_msnumberNo30000Retraso máximo entre reintentos en milisegundos
retry_on_errorsarrayNo[]Tipos de error para reintentar (vacío significa reintentar todos)

Output:

FieldTypeDescription
__event__stringEvento para enrutamiento (reintento/éxito/fallido)
attemptnumberNúmero de intento actual
max_retriesnumberNúmero máximo de reintentos configurados
delay_msnumberRetraso antes del próximo reintento en milisegundos
total_elapsed_msnumberTiempo total transcurrido en milisegundos
last_errorobjectÚltimo mensaje de error

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Inicio

flow.start

Nodo de inicio de flujo de trabajo explicito

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (start)
started_atstringHora de inicio
workflow_idstringID del flujo de trabajo

Example: Example

yaml

Subflujo

flow.subflow

Referenciar y ejecutar un flujo de trabajo externo

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (success/error)
resultanyResultado de ejecucion
execution_idstringID de ejecucion
workflow_refstringReferencia de flujo de trabajo

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Cambiar

flow.switch

Ramificacion multiple basada en coincidencia de valores

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (case:value o default)
outputsobjectValores de salida por puerto
matched_casestringCaso coincidente
valueanyValor coincidente

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Regular

flow.throttle

Regular la tasa de ejecución con un intervalo mínimo

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Tiempo mínimo entre ejecuciones en milisegundos
leadingbooleanNoTrueEjecutar en el borde inicial (la primera llamada pasa inmediatamente)

Output:

FieldTypeDescription
__event__stringEvento para enrutamiento (ejecutado/regulado)
last_execution_msnumberMarca de tiempo de la última ejecución permitida
calls_throttlednumberNúmero de llamadas reguladas desde la última ejecución
time_since_last_msnumberTiempo transcurrido desde la última ejecución en milisegundos
remaining_msnumberMilisegundos restantes hasta que se permita la próxima ejecución

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Disparador

flow.trigger

Punto de entrada de flujo de trabajo - manual, webhook, programado o evento

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvento de enrutamiento (triggered/error)
trigger_dataobjectDatos del disparador
trigger_typestringTipo de disparador
triggered_atstringHora de disparo

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.