Netflix Conductor DO_WHILE loop implementation with task retry logic

Paras Bansal
5 min readJun 30, 2022

--

Netflix Conductor is a open source orchestration tool from Netflix to orchestrate workflows that spans across microservices. While I have used Airflow as well, Netflix conductor has many features:

  1. Significantly fast and lightweight than Airflow
  2. Its distributed architecture like Airflow, but provides client libraries in many languages for workers to be implemented in JAVA, Python, Node JS and C#
  3. Gives control to you to choose your own database persistence layer and queuing service
  4. Open source similar to Airflow

There are two types of tasks:

  1. System Tasks: executed on conductor server
  2. Worker Tasks: executed by custom workers (You have to register the worker task first)

In this post I’ll show how to execute a loop using DO_WHILE operator using HTTP system task.

REST API — I’ll be using REST API provided by https://dummy.restapiexample.com/

  1. https://dummy.restapiexample.com/api/v1/employees — This API returns a LIST of all the employees
  2. https://dummy.restapiexample.com/api/v1/employee/1 — This API returns the details of one employee for which ID is provided

So our goal will be to get all the employees (as LIST) and then loop over each of them to get their details.

RETRY — Netflix Conductor provides ability to retry a task in case of failure or timeouts. But for that Task Definitions needs to be created separately. Netflix Conductor documentation provides the details on how to do this.

For this application, I have created two separate tasks:

[{
"name": "get_employees",
"taskReferenceName": "get_employees",
"retryCount": 10,
"retryLogic": "FIXED",
"retryDelaySeconds": 2,
"ownerEmail": "example@email.com"
},
{
"name": "get_name",
"taskReferenceName": "get_name",
"retryCount": 10,
"retryLogic": "FIXED",
"retryDelaySeconds": 2,
"ownerEmail": "example@email.com"
}
]

You can create them using Swagger or Curl command using API — /api/metadata/taskdefs

Both these tasks adds the RETRY logic to retry upto 10 times with a fixed delay of 2 seconds. If you want exponential backoff, then retryLogic needs to be set to EXPONENTIAL_BACKOFF

Do note that your workflow task and the task definition “names” have to match so Conductor can apply the extension properties while running the tasks.

Next is workflow definition. Once you are on Netflix Conductor UI — definitions page, you can click on new workflow definition to create one:

Once you add it, you can below workflow definition and Save it.

{
"name": "Do_While_Workflow",
"description": "Workflow to demo DO_WHILE",
"version": 1,
"tasks": [{
"name": "get_employees",
"taskReferenceName": "get_employees",
"inputParameters": {
"http_request": {
"uri": "${workflow.input.baseUrl}/employees",
"method": "GET",
"contentType": "application/json",
"readTimeOut": 5000
}
},
"type": "HTTP"
}, {
"name": "get_names_print_names",
"taskReferenceName": "get_names_print_names",
"inputParameters": {
"employee_count": "${get_employees.output.response.body.data.length()}"
},
"type": "DO_WHILE",
"loopCondition": "$.get_names_print_names['iteration'] < $.employee_count",
"loopOver": [{
"name": "get_item_at_index",
"taskReferenceName": "get_item_at_index",
"inputParameters": {
"items": "${get_employees.output.response.body.data}",
"iterator": "${get_names_print_names.output.iteration}",
"evaluatorType": "javascript",
"expression": "$.items.get(($.iterator || 1) -1)"
},
"type": "INLINE"
}, {
"name": "get_name",
"taskReferenceName": "get_name",
"inputParameters": {
"http_request": {
"uri": "${workflow.input.baseUrl}/employee/${get_item_at_index.output.result.id}",
"method": "GET",
"contentType": "application/json",
"readTimeOut": 5000
}
},
"type": "HTTP"
}
]
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2,
"restartable": true,
"ownerEmail": "example@email.com"
}

Workflow steps:

  1. Get all the employees first. The API will return a ArrayList under — get_employees.output.response.body.data
  2. For DO_WHILE inputParameters, one can calculate the count of ArrayList using length() function
  3. DO_WHILE saves the number of loop in a output parameter “iteration”. You can refer it as taskName[‘iteration’] or taskName.output.iteration
  4. For DO_WHILE loopCondition is simply a LAMBDA call as: `$.get_names_print_names[‘iteration’] < $.employee_count`. This will ensure loop will run till iterations are less than count
  5. Finally DO_WHILE loopOver, where you can specify the tasks you want to loop over.
  6. Here the 1st task I am doing is to get the data element by each iteration using a INLINE operator by executing a javascript expression — “expression”: “$.items.get(($.iterator || 1) -1)”
  7. Note the syntax here a little bit. $.iterator gives 1 for the 1st iteration, but because $.items is an array, we need to get the 1st one. So I am subtracting 1 to get $.items(0) which is my first element
  8. Once you have the item, the same is used in the next step to do the HTTP call by extracting the id from the employee object

Once you click save, it will create the workflow definition and also show the workflow as below:

Execution — You can click on “Workbench” at the top of UI and run your workflow as below:

Provide your input parameters (in my case baseUrl) and hit the play button. It provides a shortcut on the right side to look at the execution. Click on it to see it:

Just to explain this window a little bit, you can see get_name api was called 49 times. If you click on Task List, you can see that though there were many failures, still Conductor retried till it succeeded (max 10 times in this case).

If you see here, get_employees was retried 2 times and get_name_1 was retried 1 time before it was marked completed.

You can check the input and output of each task as well by it’s attempt as below:

So get_name_1 was success on the 2nd attempt (initial one was Attempt 0)

That’s it folks. Using the method above, you can implement a loop in Netflix Conductor. Please leave your feedback and comments.

--

--

Paras Bansal
Paras Bansal

Written by Paras Bansal

Senior Architect, Engineer by heart

Responses (1)