/ workflows

Qyu: A distributed task execution system for complex workflows

Here at FindHotel we build many of our tools in-house and whenever we think that someone can use and benefit from those tools, we release them to the open-source community. This article is about Qyu, a distributed task execution system we built for our advertising campaigns building and marketing software, and how we use it to manage our internal workflows. About two years ago, we were using Resque as a job scheduler for our Ruby services. The application was monolithic, memory-consuming and jobs were always lost in Resque's Redis queue. Resque's workers died abruptly without reliable error reporting, had to be redeployed if we want to change the number of workers on a certain queue and was not supported any longer by the open-source community. We had 10s of services that are used at a variable rate so adding to all the previous problems, we were paying more money for infrastructure that we should. We needed a solution that is reliable, autoscaling and that will help us to break our monolith into microservices.

Qyu is built in Ruby and depends on two basic pieces: a message queue and a state store. By default, as well as for testing purposes, Qyu ships with an in-memory message queue and state store which are definitely not suitable for production purposes. For production usage, we implemented two state stores based on ActiveRecord (for relational databases) and Redis, in addition to two message queues based on Amazon Simple Queue Service (SQS) and Redis. In our production environment, we use the ActiveRecord adapter with PostgreSQL along with the Amazon SQS adapter.

Architecture

State Store

The state store is the persistence layer of Qyu. It has 3 main models:

  • Workflow: Used to generate tasks under a job. It describes how a certain program flow looks like.
  • Job: A set of tasks running according to a certain workflow.
  • Task: Single building block of a job. It has it's payload and own status.

The following is an example of a simple workflow:

descriptor = {
  'starts' => %w(
    print_hello
  ),
  'tasks' => {
    'print_hello' => {
      'queue' => 'print-hello'
    }
  }
}

This workflow instructs the program to start with a task called print_hello and then this task is set to be enqueued in a queue called print-hello. A job using this workflow will have only one task enqueued in the queue.

Message Queue

The message queue component appends task IDs to their respective queues. It provides an organized and fault-tolerant method of dequeuing tasks and ensuring they run successfully before declaring them a success and removing them. An example of the message in a queue is {"task_id": 420}. A listening worker dequeues the message, gets the payload from the state store and starts processing the task.

Usage

Create your first workflow:

Qyu::Workflow.create(name: 'say-hello', descriptor: descriptor)

The above will create a workflow titled say-hello . Now we need to use that workflow by creating a job that follows it. We are gonna create a job with a sample payload as follows. Once you call start on the job, it will create the children tasks in the state store and enqueue the task ID in Qyu's message queue.

job = Qyu::Job.create(workflow: 'say-hello', payload: { 'times' => 5 })
job.start

Now that we have a message in the queue, we need a worker to listen on that queue, consume the message and perform something according to the specified payload. The following worker consumes the messages in the print-hello  queue and prints "Hello" a certain number of times that is specified in the payload.

class SimpleWorker
  def initialize
    @worker = Qyu::Worker.new
  end
 
  def run
    # Consumes message from print-hello queue
    @worker.work('print-hello') do |task|
      task.payload['times'].times do |i|
        puts "#{i + 1}. Hello"
      end
    rescue StandardError => ex
        # do something
    end
  end
end
 
SimpleWorker.run

This worker will this pop a message from the queue and execute the code on it. Output for the above program is the following:

1. Hello
2. Hello
3. Hello
4. Hello
5. Hello

It read a parameter from the payload and used it in the worker. This is a very simple example to how Qyu can be used but there is other great features for Qyu that did not manifest themselves in the above example. One of the most interesting features is the sync workers or sync gates.

Sync Workers

In some use cases, you may need some tasks to finish before starting another specific task that depends on the output of those tasks. This is where Qyu's sync workers become useful. A sync worker can be started as follows:

w = Qyu::Workers::Sync.new
w.work('sync-adgroups')

To demonstrate the point, I will give one of our use cases as an example for the sync worker. At FindHotel we use Qyu to manage out campaign building software. We first build empty ad groups, populate those ad groups with ads and keywords and then at the end we package those ad groups into campaigns. The services that generate ads and keywords need to wait for the service that generates ad groups. In the same fashion, the service that packages those ad groups into campaigns need to wait for all of this. The following is out workflow descriptor for this component.

descriptor = {
    'starts' => [
      'ad:group:generate'
    ],
    'tasks'  => {
      'ad:group:generate'  => {
        'queue'  => 'ad-group-generate',
        'starts' => [
          'ad:group:children:split'
        ]
      },
      'ad:group:children:split' => {
        'queue' => 'ad-group-children-split',
        'starts_manually' => ['ad:group:children:generate'],
        'starts_with_params' => {
          'ad:group:sync:gate' => {
            'nr_tasks' => {
              'count' => 'ad:group:children:generate'
            }
          }
        }
      },
      'ad:group:children:generate' => {
        'queue' => 'ad-group-children-generate'
      },
      'ad:group:sync:gate' => {
        'queue' => 'sync-adgroups',
        'waits_for' => {
          'ad:group:children:generate' => {
            'condition' => {
              'param' => 'nr_tasks',
              'function' => 'eq_completed'
            }
          }
        },
        'starts' => [
          'campaign:package'
        ]
      },
      'campaign:package' => {
        'queue' => 'campaign-package',
      }
    }
  }

Notice the ad:group:sync:gate in sync-adgroups queue which waits for all ad:group:children:generate tasks to finish successfully before starting the dependent campaign:package task. Once the number of spawned tasks is equal the number of completed tasks; campaign packaging starts. So whenever there is a need to optimize code runtime and flexibility, Qyu is useful to paralellize some parts of the workflow and get a single output in the end. You can deploy multiple workers in Docker containers and scale some up/down according to load at any certain point in the workflow. For example we have 1 docker container for campaign-package  since it is a simple task that should not be parallelized but we generate ads and keywords under ad groups via multiple docker containers.

Diagram

To conclude, Qyu is very useful to us and we love it. We released it to the open source community to further improve it and hoping that it will help other developers build distributed systems. Feel free to submit issues or PRs at https://github.com/QyuTeam/qyu.

Mohamed Osama

Mohamed Osama

Software Engineer; Entrepreneur; Data analyst; Rails Contributor. linkedin.com/in/mohamedosamaa stackoverflow.com/cv/oss

Read More