Skip to content

Flow Control

Branching, loops, parallelism, subflows, triggers, and error handling.

24 modules

ModuleDescription
Traitement par lotsTraiter les éléments par lots avec une taille configurable
BranchementBranchement conditionnel base sur l'evaluation d'expression
Point d'arretMettre en pause l'execution du workflow pour approbation ou entree humaine
DisjoncteurModèle de disjoncteur pour prévenir les pannes en cascade
ConteneurConteneur de sous-flux integre pour organiser des workflows complexes
DébouncerDébouncer l'exécution pour éviter les appels répétés rapides
FinNoeud de fin de workflow explicite
Gestionnaire d'erreursIntercepte et gère les erreurs des nœuds en amont
Déclencheur de workflow d'erreurPoint d'entrée pour les workflows d'erreur - déclenché lorsqu'un autre workflow échoue
Pour chaqueIterer sur une liste et executer des etapes pour chaque element
FourcheDiviser l'execution en branches paralleles
Aller aSaut inconditionnel vers une autre etape
Invoquer le flux de travailExécuter un fichier de flux de travail externe
JoindreAttendre que les branches paralleles se terminent
BoucleRepeter des etapes N fois en utilisant le routage par port de sortie
FusionnerFusionner plusieurs entrees en une seule sortie
ParallèleExécuter plusieurs tâches en parallèle avec différentes stratégies
Limite de DébitLimiter le débit d'exécution en utilisant un seau à jetons ou une fenêtre glissante
RéessayerRéessayer les opérations échouées avec un backoff configurable
DebutNoeud de debut de workflow explicite
Sous-fluxReferencer et executer un workflow externe
CommutateurBranchement multiple base sur la correspondance de valeur
LimiterLimiter le taux d'exécution avec un intervalle minimum
DeclencheurPoint d'entree du workflow - manuel, webhook, planifie ou evenement

Modules

Traitement par lots

flow.batch

Traiter les éléments par lots avec une taille configurable

Parameters:

NameTypeRequiredDefaultDescription
itemsarrayYes-Array of items to process. Can be numbers, strings, or objects.
batch_sizenumberYes10Nombre d'éléments par lot
delay_msnumberNo0Millisecondes à attendre entre les lots (pour limiter le débit)
continue_on_errorbooleanNoFalseContinuer à traiter les lots restants si un échoue
parallel_batchesnumberNo1Continuer à traiter les lots restants si un échoue

Output:

FieldTypeDescription
__event__stringNombre de lots à traiter en parallèle (1 pour séquentiel)
batcharrayÉvénement pour le routage (lot/terminé/erreur)
batch_indexnumberÉvénement pour le routage (lot/terminé/erreur)
total_batchesnumberÉléments du lot actuel
total_itemsnumberIndex du lot actuel (à partir de 0)
is_last_batchbooleanNombre total de lots
progressobjectNombre total d'éléments

Example: Example

yaml
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
batch_size: 10

Example: Example

yaml
items: ${input.records}
batch_size: 100
delay_ms: 1000

Example: Example

yaml
items: ${input.data}
batch_size: 50
parallel_batches: 3
continue_on_error: true

Branchement

flow.branch

Branchement conditionnel base sur l'evaluation d'expression

Parameters:

NameTypeRequiredDefaultDescription
conditionstringYes-Expression to evaluate (supports ==, !=, >, <, >=, <=, contains)

Output:

FieldTypeDescription
__event__stringEvenement de routage (true/false/error)
outputsobjectValeurs de sortie par port
resultbooleanResultat du branchement
conditionstringValeur de la condition
resolved_conditionstringResultat de l'evaluation de la condition

Example: Example

yaml
condition: ${search_step.count} > 0

Example: Example

yaml
condition: ${api_call.status} == success

Point d'arret

flow.breakpoint

Mettre en pause l'execution du workflow pour approbation ou entree humaine

Parameters:

NameTypeRequiredDefaultDescription
titlestringNoApproval RequiredTitle displayed to approvers
descriptionstringNo-Optional description text
timeout_secondsnumberNo0Maximum wait time (0 for no timeout)
required_approversarrayYes-Array of data items to process
approval_modeselect (single, all, majority, first)NosingleHow approvals are counted
custom_fieldsarrayYes-Array of data items to process
include_contextbooleanNoTrueWhether to include execution context
auto_approve_conditionstringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvenement de routage (approved/rejected/timeout)
breakpoint_idstringID du point d'arret
statusstringStatut
approved_byarrayApprouve par
rejected_byarrayRejete par
custom_inputsobjectValeurs d'entree personnalisees
commentsarrayCommentaires de revue
resolved_atstringHeure de resolution
wait_duration_msintegerDuree d'attente (ms)

Example: Example

yaml
title: Approve data export
description: Please review and approve the data export

Example: Example

yaml
title: Manager Approval Required
description: Large transaction requires manager approval
required_approvers: ["manager@example.com"]
timeout_seconds: 3600

Example: Example

yaml
title: Adjustment Required
custom_fields: [{"name": "reason", "label": "Reason", "type": "text", "required": true}, {"name": "amount", "label": "Amount", "type": "number", "required": true}]

Disjoncteur

flow.circuit_breaker

Modèle de disjoncteur pour prévenir les pannes en cascade

Parameters:

NameTypeRequiredDefaultDescription
failure_thresholdnumberYes5Nombre d'échecs avant d'ouvrir le disjoncteur
reset_timeout_msnumberNo60000Temps en millisecondes avant que le disjoncteur passe à mi-ouvert
half_open_maxnumberNo1Nombre maximum de requêtes autorisées en état mi-ouvert

Output:

FieldTypeDescription
__event__stringÉvénement pour le routage (autorisé/rejeté/mi-ouvert)
statestringÉtat du disjoncteur (fermé/ouvert/mi-ouvert)
failure_countnumberNombre d'échecs consécutifs
last_failure_time_msnumberHorodatage du dernier échec en millisecondes
time_until_half_open_msnumberMillisecondes avant que le disjoncteur passe à mi-ouvert

Example: Example

yaml
failure_threshold: 5
reset_timeout_ms: 60000

Example: Example

yaml
failure_threshold: 2
reset_timeout_ms: 10000
half_open_max: 1

Example: Example

yaml
failure_threshold: 20
reset_timeout_ms: 120000
half_open_max: 3

Conteneur

flow.container

Conteneur de sous-flux integre pour organiser des workflows complexes

Parameters:

NameTypeRequiredDefaultDescription
subflowobjectNo{'nodes': [], 'edges': []}Embedded workflow definition with nodes and edges
inherit_contextbooleanNoTrueWhether to inherit variables from parent workflow
isolated_variablesarrayYes-Array of data items to process
export_variablesarrayYes-Array of data items to process

Output:

FieldTypeDescription
__event__stringEvenement de routage (success/error)
outputsobjectValeurs de sortie par port
subflow_resultobjectResultat du sous-flux
exported_variablesobjectVariables exportees
node_countintegerNombre de noeuds
execution_time_msnumberTemps d'execution (ms)

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: true

Example: Example

yaml
subflow: {"nodes": [], "edges": []}
inherit_context: false

Débouncer

flow.debounce

Débouncer l'exécution pour éviter les appels répétés rapides

Parameters:

NameTypeRequiredDefaultDescription
delay_msnumberYes-Temps d'attente après le dernier appel avant l'exécution
leadingbooleanNoFalseExécuter sur le bord avant (le premier appel déclenche immédiatement)
trailingbooleanNoTrueExécuter sur le bord arrière (après expiration du délai)

Output:

FieldTypeDescription
__event__stringÉvénement pour le routage (exécuté/débouncé)
last_call_msnumberHorodatage du dernier appel en millisecondes
calls_debouncednumberNombre d'appels débouncés depuis la dernière exécution
time_since_last_msnumberTemps écoulé depuis le dernier appel en millisecondes
edgestringQuel bord a déclenché l'exécution (avant/arrière)

Example: Example

yaml
delay_ms: 500

Example: Example

yaml
delay_ms: 1000
leading: true
trailing: false

Example: Example

yaml
delay_ms: 2000
leading: true
trailing: true

Fin

flow.end

Noeud de fin de workflow explicite

Parameters:

NameTypeRequiredDefaultDescription
output_mappingobjectNo{}Map internal variables to workflow output
success_messagestringNo-Text content to process

Output:

FieldTypeDescription
__event__stringEvenement de routage (end)
ended_atstringHeure de fin
workflow_resultobjectResultat du workflow

Example: Example

yaml

Example: Example

yaml
output_mapping: {"result": "${process.output}", "status": "success"}

Gestionnaire d'erreurs

flow.error_handle

Intercepte et gère les erreurs des nœuds en amont

Parameters:

NameTypeRequiredDefaultDescription
actionstringYeslog_and_continueQue faire de l'erreur
include_tracebackbooleanNoTrueInclure la trace complète dans la sortie
error_code_mappingobjectNo{}Inclure la trace complète dans la sortie
fallback_valueanyNo-Mapper les codes d'erreur à des actions personnalisées

Output:

FieldTypeDescription
__event__stringValeur à utiliser lorsque l'erreur est supprimée
outputsobjectÉvénement pour le routage (géré/escalade)
error_infoobjectÉvénement pour le routage (géré/escalade)
action_takenstringAction entreprise

Example: Example

yaml
action: log_and_continue
include_traceback: true

Example: Example

yaml
action: suppress
fallback_value: {"status": "skipped", "reason": "upstream_error"}

Example: Example

yaml
action: transform
error_code_mapping: {"TIMEOUT": {"retry": true, "delay": 5000}, "NOT_FOUND": {"skip": true}}

Déclencheur de workflow d'erreur

flow.error_workflow_trigger

Point d'entrée pour les workflows d'erreur - déclenché lorsqu'un autre workflow échoue

Parameters:

NameTypeRequiredDefaultDescription
descriptionstringNo-Description of this error workflow

Output:

FieldTypeDescription
__event__stringDescription de ce workflow d'erreur
error_contextobjectÉvénement pour le routage (déclenché)
triggered_atstringHorodatage ISO lorsque le workflow d'erreur a été déclenché

Example: Example

yaml
description: Send Slack notification on workflow failure

Example: Example

yaml
description: Log all workflow errors to monitoring system

Pour chaque

flow.foreach

Iterer sur une liste et executer des etapes pour chaque element

Parameters:

NameTypeRequiredDefaultDescription
itemsstringYes-Liste d'elements a iterer (supporte la reference ${variable})
stepsarrayNo-Etapes a executer pour chaque element
item_varstringNoitemNom de variable pour l'element actuel
index_varstringNoindexNom de variable pour l'index actuel
output_modestringNocollectMode de collecte des resultats

Output:

FieldTypeDescription
__event__stringEvenement de routage (iterate/done)
__set_contextobjectDefinir le contexte
outputsobjectValeurs de sortie par port
iterationnumberIndex d'iteration actuel
statusstringStatut de l'operation
resultsarrayResultats collectes
countnumberNombre total d'elements

Example: Example

yaml
items: ${steps.csv.result.data}

Example: Example

yaml
items: ${search_results}
item_var: element
steps: [{"module": "element.text", "params": {"element_id": "${element}"}, "output": "text"}]

Fourche

flow.fork

Diviser l'execution en branches paralleles

Parameters:

NameTypeRequiredDefaultDescription
branch_countnumberNo2Number of parallel branches

Output:

FieldTypeDescription
__event__stringEvenement de routage (fork/error)
input_dataanyDonnees d'entree
branch_countintegerNombre de branches

Example: Example

yaml
branch_count: 2

Example: Example

yaml
branch_count: 3

Aller a

flow.goto

Saut inconditionnel vers une autre etape

Parameters:

NameTypeRequiredDefaultDescription
targetstringYes-Step ID to jump to
max_iterationsnumberNo100Maximum number of iterations (prevents infinite loops)

Output:

FieldTypeDescription
__event__stringEvenement de routage (goto)
targetstringEtape cible
iterationnumberNombre d'iterations

Example: Example

yaml
target: fetch_next_page
max_iterations: 10

Example: Example

yaml
target: cleanup_step

Invoquer le flux de travail

flow.invoke

Exécuter un fichier de flux de travail externe

Parameters:

NameTypeRequiredDefaultDescription
workflow_sourcestringYes-File path to workflow YAML or inline YAML content
workflow_paramsobjectYes-Parameters to pass to the invoked workflow
timeout_secondsnumberNo300Maximum execution time in seconds
output_mappingobjectNo{}Map internal variables to workflow output

Output:

FieldTypeDescription
__event__stringParamètres à passer au flux de travail invoqué
resultanyTemps d'exécution maximum en secondes
workflow_idstringÉvénement pour le routage (succès/erreur)
execution_time_msnumberRésultat de l'exécution du flux de travail

Example: Example

yaml
workflow_source: workflows/validate_order.yaml
workflow_params: {"order_id": "${input.order_id}"}
timeout_seconds: 60

Example: Example

yaml
workflow_source: workflows/process_data.yaml
workflow_params: {"data": "${input.data}"}
output_mapping: {"processed": "result.data"}

Joindre

flow.join

Attendre que les branches paralleles se terminent

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (all, any, first)NoallHow to handle multiple inputs
input_countnumberNo2Number of ports
timeoutnumberNo60000Maximum time to wait in milliseconds
cancel_pendingbooleanNoTrueCancel pending branches when using first strategy

Output:

FieldTypeDescription
__event__stringEvenement de routage (joined/timeout/error)
joined_dataarrayDonnees jointes
completed_countintegerNombre de branches terminees
strategystringStrategie de jonction

Example: Example

yaml
strategy: all
input_count: 2
timeout_ms: 30000

Example: Example

yaml
strategy: first
input_count: 3
cancel_pending: true

Boucle

flow.loop

Repeter des etapes N fois en utilisant le routage par port de sortie

Parameters:

NameTypeRequiredDefaultDescription
timesnumberYes1Nombre de repetitions
targetstringNo-Etape cible (obsolete)
stepsarrayNo-Etapes a executer pour chaque iteration
index_varstringNoindexNom de variable pour l'index actuel

Output:

FieldTypeDescription
__event__stringEvenement de routage (iterate/done)
outputsobjectValeurs de sortie par port
iterationnumberIteration actuelle
statusstringStatut de l'operation
resultsarrayResultats collectes
countnumberTotal des iterations

Example: Example

yaml
times: 3

Example: Example

yaml
times: 5
steps: [{"module": "browser.click", "params": {"selector": ".next"}}]

Fusionner

flow.merge

Fusionner plusieurs entrees en une seule sortie

Parameters:

NameTypeRequiredDefaultDescription
strategyselect (first, last, all)NoallHow to merge multiple inputs
input_countnumberNo2Number of ports

Output:

FieldTypeDescription
__event__stringEvenement de routage (merged/error)
merged_dataanyDonnees fusionnees
input_countintegerNombre d'entrees
strategystringStrategie de fusion

Example: Example

yaml
strategy: all
input_count: 3

Example: Example

yaml
strategy: first
input_count: 2

Parallèle

flow.parallel

Exécuter plusieurs tâches en parallèle avec différentes stratégies

Parameters:

NameTypeRequiredDefaultDescription
tasksarrayYes-Tableau de définitions de tâches à exécuter en parallèle
modestringNoallTableau de définitions de tâches à exécuter en parallèle
timeout_msnumberNo60000Maximum wait time in milliseconds
fail_fastbooleanNoTrueArrêter toutes les tâches à la première erreur (uniquement pour mode=all)
concurrency_limitnumberNo0Arrêter toutes les tâches à la première erreur (uniquement pour mode=all)

Output:

FieldTypeDescription
__event__stringNombre maximum de tâches simultanées (0 pour illimité)
resultsarrayÉvénement pour le routage (terminé/partiel/erreur)
completed_countnumberÉvénement pour le routage (terminé/partiel/erreur)
failed_countnumberRésultats de toutes les tâches
total_countnumberNombre de tâches complétées avec succès
modestringNombre de tâches échouées
duration_msnumberNombre total de tâches

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://api2.example.com"}}]
mode: all
timeout_ms: 30000

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://mirror1.example.com"}}, {"module": "http.get", "params": {"url": "https://mirror2.example.com"}}]
mode: race

Example: Example

yaml
tasks: [{"module": "http.get", "params": {"url": "https://api1.example.com"}}, {"module": "http.get", "params": {"url": "https://might-fail.example.com"}}]
mode: settle

Limite de Débit

flow.rate_limit

Limiter le débit d'exécution en utilisant un seau à jetons ou une fenêtre glissante

Parameters:

NameTypeRequiredDefaultDescription
max_requestsnumberYes-Nombre maximum de requêtes autorisées par fenêtre
window_msnumberNo60000Fenêtre temporelle en millisecondes
strategystringNotoken_bucketStratégie de limitation de débit (seau à jetons ou fenêtre glissante)
queue_overflowstringNowaitComportement lorsque la file d'attente est pleine (abandon ou erreur)

Output:

FieldTypeDescription
__event__stringÉvénement pour le routage (autorisé/limité)
tokens_remainingnumberJetons restants dans le seau
window_reset_msnumberMillisecondes avant la réinitialisation de la fenêtre
requests_in_windownumberNombre de requêtes dans la fenêtre actuelle
wait_msnumberMillisecondes à attendre avant la prochaine requête autorisée

Example: Example

yaml
max_requests: 100
window_ms: 60000
strategy: token_bucket

Example: Example

yaml
max_requests: 10
window_ms: 1000
strategy: fixed_window
queue_overflow: error

Example: Example

yaml
max_requests: 50
window_ms: 30000
strategy: sliding_window
queue_overflow: wait

Réessayer

flow.retry

Réessayer les opérations échouées avec un backoff configurable

Parameters:

NameTypeRequiredDefaultDescription
max_retriesnumberYes3Nombre maximum de tentatives de réessai
initial_delay_msnumberNo1000Délai initial avant le premier réessai en millisecondes
backoff_multipliernumberNo2.0Multiplicateur pour le backoff exponentiel
max_delay_msnumberNo30000Délai maximum entre les réessais en millisecondes
retry_on_errorsarrayNo[]Types d'erreurs à réessayer (vide signifie réessayer tout)

Output:

FieldTypeDescription
__event__stringÉvénement pour le routage (réessayer/réussi/échoué)
attemptnumberNuméro de tentative actuel
max_retriesnumberNombre maximum de réessais configurés
delay_msnumberDélai avant la prochaine réessai en millisecondes
total_elapsed_msnumberTemps total écoulé en millisecondes
last_errorobjectDernier message d'erreur

Example: Example

yaml
max_retries: 3

Example: Example

yaml
max_retries: 10
initial_delay_ms: 500
backoff_multiplier: 1.5
max_delay_ms: 10000

Example: Example

yaml
max_retries: 5
initial_delay_ms: 2000
retry_on_errors: ["TIMEOUT", "RATE_LIMIT", "429", "503"]

Debut

flow.start

Noeud de debut de workflow explicite

Output:

FieldTypeDescription
__event__stringEvenement de routage (start)
started_atstringHeure de debut
workflow_idstringID du workflow

Example: Example

yaml

Sous-flux

flow.subflow

Referencer et executer un workflow externe

Parameters:

NameTypeRequiredDefaultDescription
workflow_refstringYes-Text content to process
execution_modeselect (inline, spawn, async)NoinlineSelect an option
input_mappingobjectYes-Data object to process
output_mappingobjectNo{}Map internal variables to workflow output
timeoutnumberNo300000Maximum time to wait in milliseconds

Output:

FieldTypeDescription
__event__stringEvenement de routage (success/error)
resultanyResultat de l'execution
execution_idstringID d'execution
workflow_refstringReference du workflow

Example: Example

yaml
workflow_ref: workflows/validate_order
execution_mode: inline
input_mapping: {"order_data": "${input.order}"}
output_mapping: {"validation_result": "result"}

Example: Example

yaml
workflow_ref: workflows/send_notifications
execution_mode: spawn

Commutateur

flow.switch

Branchement multiple base sur la correspondance de valeur

Parameters:

NameTypeRequiredDefaultDescription
expressionstringYes-Value to match against cases (supports variable reference)
casesarrayYes[{'id': 'case_1', 'value': 'case1', 'label': 'Case 1'}]List of case definitions

Output:

FieldTypeDescription
__event__stringEvenement de routage (case:value ou default)
outputsobjectValeurs de sortie par port
matched_casestringCas correspondant
valueanyValeur correspondante

Example: Example

yaml
expression: ${api_response.status}
cases: [{"id": "case-1", "value": "success", "label": "Success"}, {"id": "case-2", "value": "pending", "label": "Pending"}, {"id": "case-3", "value": "error", "label": "Error"}]

Example: Example

yaml
expression: ${input.type}
cases: [{"id": "img", "value": "image", "label": "Image"}, {"id": "vid", "value": "video", "label": "Video"}, {"id": "txt", "value": "text", "label": "Text"}]

Limiter

flow.throttle

Limiter le taux d'exécution avec un intervalle minimum

Parameters:

NameTypeRequiredDefaultDescription
interval_msnumberYes-Temps minimum entre les exécutions en millisecondes
leadingbooleanNoTrueExécuter au bord d'attaque (premier appel passe immédiatement)

Output:

FieldTypeDescription
__event__stringÉvénement pour le routage (exécuté/limité)
last_execution_msnumberHorodatage de la dernière exécution autorisée
calls_throttlednumberNombre d'appels limités depuis la dernière exécution
time_since_last_msnumberTemps écoulé depuis la dernière exécution en millisecondes
remaining_msnumberMillisecondes restantes jusqu'à la prochaine exécution autorisée

Example: Example

yaml
interval_ms: 1000

Example: Example

yaml
interval_ms: 200
leading: true

Example: Example

yaml
interval_ms: 5000
leading: false

Declencheur

flow.trigger

Point d'entree du workflow - manuel, webhook, planifie ou evenement

Parameters:

NameTypeRequiredDefaultDescription
trigger_typeselect (manual, webhook, schedule, event, mcp, polling)NomanualType of trigger event
webhook_pathstringNo-URL path for webhook trigger
schedulestringNo-Cron expression for scheduled trigger
event_namestringNo-Event name to listen for
tool_namestringNo-MCP tool name exposed to AI agents
tool_descriptionstringNo-Description shown to AI agents for this tool
poll_urlstringNo-API endpoint to poll for changes
poll_intervalnumberNo300How often to check for changes (minimum 60 seconds)
poll_methodselect (GET, POST)NoGETHTTP method for polling request
poll_headersobjectNo{}Custom headers for polling request (e.g. API keys)
poll_bodyobjectNo{}Request body for POST polling
dedup_keystringNo-JSON path to extract a unique value for deduplication
configobjectNo-Custom trigger config (for composites: LINE BOT, Telegram, Slack, etc.)
descriptionstringNo-Optional description text

Output:

FieldTypeDescription
__event__stringEvenement de routage (triggered/error)
trigger_dataobjectDonnees du declencheur
trigger_typestringType de declencheur
triggered_atstringHeure de declenchement

Example: Example

yaml
trigger_type: manual

Example: Example

yaml
trigger_type: webhook
webhook_path: /api/webhooks/order-created

Example: Example

yaml
trigger_type: schedule
schedule: 0 * * * *

Example: Example

yaml
trigger_type: mcp
tool_name: send-report
tool_description: Send a weekly summary report

Example: Example

yaml
trigger_type: polling
poll_url: https://api.example.com/items
poll_interval: 300
dedup_key: $.data[0].id

Released under the Apache 2.0 License.