Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Elaborazione LottiElabora gli elementi in lotti con dimensioni configurabili
DiramazioneRamificazione condizionale basata su valutazione espressione
Punto di interruzioneMetti in pausa esecuzione workflow per approvazione o input umano
InterruttoreSchema di interruttore per prevenire guasti a cascata
ContenitoreContainer subflow embedded per organizzare workflow complessi
AntirimbalzoEsecuzione debounce per prevenire chiamate ripetute rapide
FineNodo fine workflow esplicito
Gestore ErroriCattura e gestisce errori dai nodi a monte
Attivazione Flusso di Lavoro ErrorePunto di ingresso per flussi di lavoro di errore - attivato quando un altro flusso di lavoro fallisce
Per OgniItera su una lista ed esegui passaggi per ogni elemento
BiforcazioneDividi esecuzione in branch paralleli
Vai aSalto incondizionato a un altro passaggio
Invoca flusso di lavoroEsegui un file di flusso di lavoro esterno
UnisciAttendi completamento branch paralleli
CicloRipeti passaggi N volte usando routing porta output
UnisciUnisci input multipli in output singolo
ParalleloEsegui più attività in parallelo con diverse strategie
Limite di VelocitàLimita l'esecuzione usando il token bucket o la finestra mobile
RiprovaRiprova le operazioni fallite con backoff configurabile
InizioNodo inizio workflow esplicito
SottoflussoRiferisci ed esegui workflow esterno
InterruttoreRamificazione multipla basata su corrispondenza valore
LimitaLimita la frequenza di esecuzione con intervallo minimo
InnescoPunto ingresso workflow - manuale, webhook, pianificato o evento

Modules

Elaborazione Lotti

flow.batch

Elabora gli elementi in lotti con dimensioni configurabili

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Numero di elementi per lotto
delay_msnumberNo0Millisecondi di attesa tra i lotti (per limitazione del tasso)
continue_on_errorbooleanNoFalseContinua a elaborare i lotti rimanenti se uno fallisce
parallel_batchesnumberNo1Continua a elaborare i lotti rimanenti se uno fallisce

Output:

FieldTypeDescription
__event__stringNumero di lotti da elaborare in parallelo (1 per sequenziale)
batcharrayEvento per instradamento (lotto/completato/errore)
batch_indexnumberEvento per instradamento (lotto/completato/errore)
total_batchesnumberElementi del lotto corrente
total_itemsnumberIndice del lotto corrente (basato su 0)
is_last_batchbooleanNumero totale di lotti
progressobjectNumero totale di elementi

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Diramazione

flow.branch

Ramificazione condizionale basata su valutazione espressione

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvento routing (true/false/error)
outputsobjectValori output per porta
resultbooleanRisultato branch
conditionstringValore condizione
resolved_conditionstringRisultato valutazione condizione

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Punto di interruzione

flow.breakpoint

Metti in pausa esecuzione workflow per approvazione o input umano

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento routing (approved/rejected/timeout)
breakpoint_idstringID Breakpoint
statusstringStato
approved_byarrayApprovato da
rejected_byarrayRifiutato da
custom_inputsobjectValori input personalizzati
commentsarrayCommenti revisione
resolved_atstringOra risoluzione
wait_duration_msintegerDurata attesa (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Interruttore

flow.circuit_breaker

Schema di interruttore per prevenire guasti a cascata

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Numero di guasti prima di aprire il circuito
reset_timeout_msnumberNo60000Tempo in millisecondi prima che il circuito passi a semiaperto
half_open_maxnumberNo1Numero massimo di richieste consentite in stato semiaperto

Output:

FieldTypeDescription
__event__stringEvento per il routing (consentito/rifiutato/semiaperto)
statestringStato del circuito (chiuso/aperto/semiaperto)
failure_countnumberNumero di guasti consecutivi
last_failure_time_msnumberTimestamp dell'ultimo guasto in millisecondi
time_until_half_open_msnumberMillisecondi fino al passaggio del circuito a semiaperto

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Contenitore

flow.container

Container subflow embedded per organizzare workflow complessi

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvento routing (success/error)
outputsobjectValori output per porta
subflow_resultobjectRisultato subflow
exported_variablesobjectVariabili esportate
node_countintegerConteggio nodi
execution_time_msnumberTempo esecuzione (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Antirimbalzo

flow.debounce

Esecuzione debounce per prevenire chiamate ripetute rapide

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Tempo di attesa dopo l'ultima chiamata prima dell'esecuzione
leadingbooleanNoFalseEsegui sul bordo iniziale (la prima chiamata attiva immediatamente)
trailingbooleanNoTrueEsegui sul bordo finale (dopo la scadenza del ritardo)

Output:

FieldTypeDescription
__event__stringEvento per il routing (eseguito/debounced)
last_call_msnumberTimestamp dell'ultima chiamata in millisecondi
calls_debouncednumberNumero di chiamate debounce dall'ultima esecuzione
time_since_last_msnumberTempo trascorso dall'ultima chiamata in millisecondi
edgestringQuale bordo ha attivato l'esecuzione (iniziale/finale)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

Fine

flow.end

Nodo fine workflow esplicito

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvento routing (end)
ended_atstringOra fine
workflow_resultobjectRisultato workflow

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Gestore Errori

flow.error_handle

Cattura e gestisce errori dai nodi a monte

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueCosa fare con l'errore
include_tracebackbooleanNoTrueIncludi il traceback completo nell'output
error_code_mappingobjectNo{}Includi il traceback completo nell'output
fallback_valueanyNo-Mappa i codici di errore a azioni personalizzate

Output:

FieldTypeDescription
__event__stringValore da usare quando l'errore è soppresso
outputsobjectEvento per instradamento (gestito/escalation)
error_infoobjectEvento per instradamento (gestito/escalation)
action_takenstringAzione intrapresa

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Attivazione Flusso di Lavoro Errore

flow.error_workflow_trigger

Punto di ingresso per flussi di lavoro di errore - attivato quando un altro flusso di lavoro fallisce

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDescrizione di questo flusso di lavoro di errore
error_contextobjectEvento per instradamento (attivato)
triggered_atstringTimestamp ISO quando il flusso di lavoro di errore è stato attivato

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

Per Ogni

flow.foreach

Itera su una lista ed esegui passaggi per ogni elemento

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-Lista di elementi su cui iterare (supporta riferimento ${variable})
stepsarrayNo-Passaggi da eseguire per ogni elemento
item_varstringNoitemNome variabile per elemento corrente
index_varstringNoindexNome variabile per indice corrente
output_modestringNocollectModalita raccolta risultati

Output:

FieldTypeDescription
__event__stringEvento routing (iterate/done)
__set_contextobjectImposta contesto
outputsobjectValori output per porta
iterationnumberIndice iterazione corrente
statusstringStato operazione
resultsarrayRisultati raccolti
countnumberConteggio totale elementi

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Biforcazione

flow.fork

Dividi esecuzione in branch paralleli

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvento routing (fork/error)
input_dataanyDati input
branch_countintegerConteggio branch

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Vai a

flow.goto

Salto incondizionato a un altro passaggio

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvento routing (goto)
targetstringPassaggio target
iterationnumberConteggio iterazioni

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Invoca flusso di lavoro

flow.invoke

Esegui un file di flusso di lavoro esterno

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParametri da passare al flusso di lavoro invocato
resultanyTempo massimo di esecuzione in secondi
workflow_idstringEvento per il routing (successo/errore)
execution_time_msnumberRisultato dell'esecuzione del flusso di lavoro

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Unisci

flow.join

Attendi completamento branch paralleli

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvento routing (joined/timeout/error)
joined_dataarrayDati uniti
completed_countintegerConteggio branch completati
strategystringStrategia join

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Ciclo

flow.loop

Ripeti passaggi N volte usando routing porta output

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Numero di ripetizioni
targetstringNo-Passaggio target (deprecato)
stepsarrayNo-Passaggi da eseguire per ogni iterazione
index_varstringNoindexNome variabile per indice corrente

Output:

FieldTypeDescription
__event__stringEvento routing (iterate/done)
outputsobjectValori output per porta
iterationnumberIterazione corrente
statusstringStato operazione
resultsarrayRisultati raccolti
countnumberIterazioni totali

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Unisci

flow.merge

Unisci input multipli in output singolo

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvento routing (merged/error)
merged_dataanyDati uniti
input_countintegerConteggio input
strategystringStrategia merge

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Parallelo

flow.parallel

Esegui più attività in parallelo con diverse strategie

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Array di definizioni di attività da eseguire in parallelo
modestringNoallArray di definizioni di attività da eseguire in parallelo
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueInterrompe tutte le attività al primo fallimento (solo per modalità=all)
concurrency_limitnumberNo0Interrompe tutte le attività al primo fallimento (solo per modalità=all)

Output:

FieldTypeDescription
__event__stringNumero massimo di attività concorrenti (0 per illimitato)
resultsarrayEvento per il routing (completato/parziale/errore)
completed_countnumberEvento per instradamento (completato/parziale/errore)
failed_countnumberRisultati da tutte le attività
total_countnumberNumero di attività completate con successo
modestringNumero di attività fallite
duration_msnumberNumero totale di attività

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Limite di Velocità

flow.rate_limit

Limita l'esecuzione usando il token bucket o la finestra mobile

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Numero massimo di richieste consentite per finestra
window_msnumberNo60000Finestra temporale in millisecondi
strategystringNotoken_bucketStrategia di limitazione della velocità (token_bucket o finestra_mobile)
queue_overflowstringNowaitComportamento quando la coda è piena (scarta o errore)

Output:

FieldTypeDescription
__event__stringEvento per il routing (consentito/limitato)
tokens_remainingnumberToken rimanenti nel bucket
window_reset_msnumberMillisecondi fino al reset della finestra
requests_in_windownumberNumero di richieste nella finestra corrente
wait_msnumberMillisecondi di attesa prima della prossima richiesta consentita

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Riprova

flow.retry

Riprova le operazioni fallite con backoff configurabile

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Numero massimo di tentativi di riprova
initial_delay_msnumberNo1000Ritardo iniziale prima del primo tentativo in millisecondi
backoff_multipliernumberNo2.0Moltiplicatore per backoff esponenziale
max_delay_msnumberNo30000Ritardo massimo tra i tentativi in millisecondi
retry_on_errorsarrayNo[]Tipi di errore su cui riprovare (vuoto significa riprovare su tutti)

Output:

FieldTypeDescription
__event__stringEvento per instradamento (riprova/successo/fallito)
attemptnumberNumero di tentativo corrente
max_retriesnumberNumero massimo di tentativi configurati
delay_msnumberRitardo prima del prossimo tentativo in millisecondi
total_elapsed_msnumberTempo totale trascorso in millisecondi
last_errorobjectUltimo messaggio di errore

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Inizio

flow.start

Nodo inizio workflow esplicito

Output:

FieldTypeDescription
__event__stringEvento routing (start)
started_atstringOra inizio
workflow_idstringID Workflow

Example: Example

yaml

Sottoflusso

flow.subflow

Riferisci ed esegui workflow esterno

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvento routing (success/error)
resultanyRisultato esecuzione
execution_idstringID Esecuzione
workflow_refstringRiferimento workflow

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Interruttore

flow.switch

Ramificazione multipla basata su corrispondenza valore

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvento routing (case:valore o default)
outputsobjectValori output per porta
matched_casestringCaso corrispondente
valueanyValore corrispondente

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Limita

flow.throttle

Limita la frequenza di esecuzione con intervallo minimo

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Tempo minimo tra le esecuzioni in millisecondi
leadingbooleanNoTrueEsegui sul bordo iniziale (la prima chiamata passa immediatamente)

Output:

FieldTypeDescription
__event__stringEvento per instradamento (eseguito/limitato)
last_execution_msnumberTimestamp dell'ultima esecuzione consentita
calls_throttlednumberNumero di chiamate limitate dall'ultima esecuzione
time_since_last_msnumberTempo trascorso dall'ultima esecuzione in millisecondi
remaining_msnumberMillisecondi rimanenti fino alla prossima esecuzione consentita

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Innesco

flow.trigger

Punto ingresso workflow - manuale, webhook, pianificato o evento

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvento routing (triggered/error)
trigger_dataobjectDati trigger
trigger_typestringTipo trigger
triggered_atstringOra attivazione

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.