processi

about processes and engines

ruote 2.1 released

Just released ruote 2.1.1. That requires a long, boring, explanatory blog post, especially since I was promising a ruote 2.0.0 release and now I’m jumping to 2.1.1.

My prose isn’t that great, the transitions from one paragraph to the next are often poor. Let’s try anyway.

Ruote is an open source ruby workflow engine. Workflow as in “business process”. Sorry, no photo workflow or beats per minute.

0.9, 2.0, 2.1

In May 2009, I had the feeling that ruote 0.9 was a bit swollen and very baroque in some aspects, some early design decisions had forced me to do some juggling and it felt painful. At that point, I was working with other languages than Ruby and I had written the beginning of a ruote clone in Lua.

Fresh ideas are tempting, after a while I got back to Ruby and started a ruote 2.0 rewrite.

I was fortunate enough to benefit from the feedback of a startup company that pushed ruote in a terrain that it was not meant to travel. Whereas I built ruote and the systems that came before for small organizations and trusted integrators to divide the work among multiple engines, these people started stuffing as much possible work into one engine, with extra wide concurrent steps.

Those experiments and deployments brought back invaluable learnings, matured the 2.0 line very quickly, but, in October, I was seriously tempted to venture into a second rewriting.

Ruote 2.0 was a complete rewrite, ruote 2.1 is only a rewrite of the core. The test suite is mostly the same and working from the 2.0 test suite allowed me to build a 2.1 very quickly.

things learnt

So what triggered the 2.1 rewrite ?

At first, a feeling of unease with the way certain things were implemented in 2.0. Some of them were inherited from the previous ruote 0.9, while others came from the rewrite, in both cases, I made bad decisions.

Second, those load reports. The 2.0 workqueue is only a refinement of the 0.9 one. The workqueue is where all the tasks for the expressions that make up the process instances are gathered. It’s a transient workqueue, processed very quickly, internal to the engine.

Ruote, being a workflow engine, is meant to coordinate work among participants. Handing work to a participant occurs when the process execution hits a ‘participant expression’. This usually implies some IO operations since the ‘real’ participant is something like a worklist (a workitem table in some DB), a process listening somewhere else and reachable via a message queue (XMPP, AMQP, Stomp, …).

As IO operations depend on things outside of the engine, the dispatch operation was usually done in a new thread, so that, meanwhile, the engine’s workqueue could go on with its work (and the other process instances).

When farming out work to ‘real’ participants with wide concurrency, as the engine is spawning a thread for each dispatch, it gets crowded. The engine got vulnerable at this point. Right in the middle of dispatching, with still lots of tasks in the transient workqueue, the best time for things to go wrong, the engine to crash, and processes to stall.

Third ? Is there a third ? Ah yes, I started playing with CouchDB (Thanks Kenneth), and many of the ideas in it are seducing and inspiring.

rubies

My passion for workflow engines comes from my [naive] hopes of reducing human labour, reducing total work by reducing/removing the coordination chores, the friction. Tasks left should be automated or left to humans. Left to humans… the bottleneck is in the human participants… Then there is this half-conviction that a workflow engine doesn’t need to be that fast, since most of the time, it sits waiting for the human-powered services to reply (or to time out).

Also I had settled on Ruby, a beautiful language, with not so fast interpreters (though this is changing, on many fronts, with so many people liking Ruby and giving lots of their times to make it better). The value of a workflow engine lies at its edge, where coordination meets execution, in the participants. That’s where Ruby shines (once more), since it makes it easy to write custom participants very quickly and to leverage all the crazy gems out there.

(I say Ruby shines once more, because I think it shines as well as the implementation language for the engine itself. It feels like the boundary between pseudo-code and code vanished)

Slow people, slow me, slow rubies.

storage and workers

I mentioned a few lines before CouchDB, that database made “out of the web”. Writing things to the disk is a necessity for a workflow engine. It’s an interpreter, but its processes are meant to last (longer than the uptime of a CPU).

One of CouchDB strengths comes from its eventual consistency and it made me wonder if, for ruote, it shouldn’t be worth sacrificing a bit of short-term performance for medium-term scalability / robustness.

A frequent complaint about Ruby is its global interpreter lock and its green threads, making it not so sexy in a 2+ core world. Threads are easy to play with from Ruby, but they’re quite expensive (some patches address that) and if you want to get past of the global lock, there seems to be only JRuby now.

Workqueue, queue ? Worker. Why not multiple workers ? One [Ruby] process per worker, that would be ideal ! Let them share a storage for Expressions, Errors, Schedules, Configuration and… Messages.

Ruote 2.1 has a persisted workqueue. For the items placed in the queue, I use the term “message”, it covers “tasks” (sent for execution by expressions) and “events” (intra-engine notifications).

An interesting consequence : a persisted workqueue can be shared by multiple workers, workers not in the same Ruby process.

A message ordering the launch of a ruote process looks like :

  {"type": "msgs",
   "_id": "1984-2151883888-1261900864.59903",
   "_rev": 0,
   "action": "launch",
   "wfid": "20091227-fujemepo",
   "tree":
     ["define", {"name": "test"}, [
       ["sequence", {}, [
         ["participant", {"ref": "alice"}, []],
         ["participant", {"ref": "bob"}, []]]]]],
   "workitem": {"fields": {}},
   "variables": {},
   "put_at": "2009/12/27 08:01:04.599054 UTC"}

Note the process definition tree passed in to the ‘tree’ part of the message, the empty workitem, the freshly minted process instance id in ‘wfid’, and yes, this is JSON. The process, named ‘test’ is simply passing work from alice to bob (in a sequence).

What about the worker that has to handle this message ? It would like :

  require 'ruote'
  require 'ruote/storage/fs_storage'

  storage = Ruote::FsStorage.new('ruote_work')
  worker = Ruote::Worker.new(storage)

  worker.run
    # current thread is now running worker

Whereas the 2.0 workqueue was transient, 2.1 has an external workqueue exploitable by 1+ workers. Simply set the workers to share the same storage. 1+ workers collaborating for the execution of business process instances.

Apart from tasks that need to be handled ASAP, some tasks are scheduled for later handling. A workflow engine thus needs some scheduling capabilities, there are activity timeouts, deliberate sleep period, recurring tasks… The 0.9 and 2.0 engines were using rufus-scheduler for this. The scheduler was living in its own thread. The 2.1 worker integrates the scheduler in the same thread that handles the messages.

(Since I wrote rufus-scheduler (with the help of many many people), it was easy for me to write/integrate a scheduler specifically for/in ruote)

A storage is meant as a pivot between workers, containing all the run data of an engine, ‘engine’ as a system (a couple storage + worker at its smallest). Workers collaborate to get things done, each task (message) is handled by only one worker and expression implementations are collision resilient.

The result is a system where each expression is an actor. When a message comes, the target expression is re-hydrated from the storage by the worker and is handed the message. The expression emits further messages and is then de-hydrated or discarded.

This the code of the worker is rather small, there’s nothing fancy there.

There will be some daemon-packaging for the worker, thanks to daemon-kit.

storage, workers and engine

We have a storage and workers, but what about the engine itself ? Is there still a need for an Engine class as an entry point ?

I think that yes, if only for making 2.1 familiar to people used to Ruote pre-2.1. At some point, I wanted to rename the Engine class of 0.9 / 2.0 as ‘Dashboard’, and forcing the engine term to designate the storage + workers + dashboards system. But well, let’s stick with ‘Engine’.

Throughout the versions the engine class shrinked. There is now no execution power in it, just the methods needed to start work, cancel work and query about work.

Here is an engine, directly tapping a storage :

  require 'ruote'
  require 'ruote/storage/fs_storage'

  storage = Ruote::FsStorage.new('ruote_work')
  engine = Ruote::Engine.new(storage)
  
  puts "currently running process instances :"
  engine.processes.each do |ps|
    puts "- #{ps.wfid}"
  end

No worker here, it’s elsewhere. That may prove useful when wiring the engine into, say, a web application. The workers may be working in dedicated processes while the web application is given the commands, by instantiating and using an Engine.

Launching a process looks like :

  pdef = Ruote.process_definition :name => 'test' do
    concurrence do
      alpha
      bravo
    end
  end

  p pdef
    # ["define", {}, [
    #   ["concurrence", {}, [
    #     ["alpha", {}, []],
    #     ["bravo", {}, []]]]]]

  wfid = engine.launch(pdef)

The execution will be done by the workers wired to the storage, somewhere else.

It’s OK to wrap a worker within an engine :

  require 'ruote'
  require 'ruote/storage/fs_storage'

  storage = Ruote::FsStorage.new('ruote_work')
  worker = Ruote::Worker.new(storage)
  engine = Ruote::Engine.new(worker)
    # the engine start the worker in its own thread
  
  puts "currently running process instances :"
  engine.processes.each do |ps|
    puts "- #{ps.wfid}"
  end

This is the pattern for a traditional, one-worker ruote engine. With the exception that it’s OK to add further workers, later.

In fact, as long as there is one worker and it can reach the storage, the process instances will resume.

storage implementations

There are currently 3 storage implementations.

The first one is memory-only, transient. It’s meant for testing or for temporary workflow engines.

The second one is file based. It’s using rufus-cloche. All the workflow information is stored in a three level JSON file hierarchy. Workers on the same system can easily share the same storage. It uses file locks to prevent overlap.

The third one is Apache CouchDB based, it’s quite slow, but we’re hoping that optimizations on the client (worker) side like using em-http-request will make it faster. Speed is decent anyway and, on the plus side, you get an engine that spans multiple host and futon as a low-level tool for tinkering with the engine data.

Those three initial storage implementations are quite naive. Future, mixed implemenation storages are possible. Why not a storage where CouchDB is used for expression storing while messages are dealt with RabbitMQ ? (Note the doubly underlying Erlang here)

dispatch threads

As mentioned previously, the default ruote implementation, when it comes to dispatching workitems to participants, will spawn a new thread each time.

People complained about that, and asked me about putting an upper limit to the number of such threads. I haven’t put that limit, but I made some provisions for an implementation of it.

There is in ruote 2.1 a dedicated service for the dispatching, it’s called dispatch_pool. For now it only understands new thread / no new thread and has no upper limit. Integrators can now simply provide their own implementation of that service.

Note that already since 0.9, a participant implementation is consulted at dispatch time. If it replies to the method do_not_thread and answers ‘true’, the dispatching will occur in the work thread and not in a new thread.

2.1 participants

Speaking of participants, there is a catch with ruote 2.1. Participants are now divided into two categories. I could say stateless versus stateful, but that would bring confusion with state machines or whatever. Perhaps, something like “by class” vs “by instance” is better. By class participants are instantiated at each dispatch and are thus manageable by any worker, while by instance participants are already instantiated and are thus only manageable via the engine (the Engine instance where they were registered).

BlockParticipants fall into the by instance category.

  engine.register_participant 'total' do |workitem|
    t = workitem.fields['articles'].inject(0) do |art|
      t += art['price'] * art['count']
    end
    workitem.fields['total'] = t
  end

will thus only be handled by the worker associated with the engine.

A by class participant is registered via code like :

  engine.register_participant(
    'alpha', Ruote::Amqp::AmqpParticipant, :queue => 'ruote')

Whatever worker gets the task of delivering a workitem to participant alpha will be able to do it.

from listeners to receivers

Ruote has always had the distincition between participants that replied by themselves to the engine (like BlockParticipant) and other participants which only focus on how to deliver the workitem to the real (remote) participant.

For those participants, the work is over when the workitem has been successfully dispatched to the ‘real’ participant. The reply channel is not handled by those participants but by what used to be called ‘listeners’. For example, the AmqpParticipant had to be paired with an AmqpListener.

As on the mailing list and in the IRC (#ruote) channel, I saw lots of confusion between listeners and the listen expression. That made me decide to rename listenerers as ‘receivers’.

A receiver could also get packaged as a daemon.

I still need to decide if a receiver should be able to “receive” ‘launch’ or ‘cancel’ messages (orders). Oh, and the engine is a receiver.

releasing

I released ruote 2.1.1 at the gemcutter.

For some people, a gem is a necessity, while for others a tag in the source is a sufficient release.

I hope that by releasing a gem from time to time and tagging the repo appropriately I might satisfy a wide range of people.

I’m planning to release gems quite often, especially in the beginning.

The unfortunate thing with a workflow engine, is that many business process instances outlive the version they started at and it’s not that easy to stay backward compatible all along… That explains some of my reticence about releasing.

ruote-kit

ruote-kit is an engine + worklist instance of ruote accessible over HTTP (speaking JSON and HTML), it has a RESTful ideal.

It’s being developed by Kenneth, with the help of Torsten. My next step is to help them upgrade ruote-kit from ruote 2.0 to 2.1. It should not be that difficult. Apart from the engine initialization steps, the rest of ruote being quite similar from 2.0 to 2.1.

conclusion

Two major versions of ruote for 2009, that’s a lot. I hope this post will help understand why I went for a second rewrite and how ruote matured and became, hopefully, less ugly.

Happy new year 2010 !

 

source : http://github.com/jmettraux/ruote
mailing-list : http://groups.google.com/group/openwferu-users
irc : freenode.net #ruote

 

Written by John Mettraux

December 31, 2009 at 9:57 am

5 Responses

Subscribe to comments with RSS.

  1. Thanks John – back from holidays and just read through this. Good overview (helped me get a better grip about the history of the project and what you plan for it).

    Cheers,
    Nicholas

    Nicholas Faiz

    January 11, 2010 at 12:13 am

  2. Hello,

    Nice work ! I would love to have an example of the ‘receivers’ concept you’re talking about. e.g. how to resume execution of the workflow from a web application

    Karl Robertson

    January 20, 2010 at 11:58 am

    • Hi Karl,

      I should cook up a small web application example.

      How you resume the execution of a workflow depends on how you pass workitems between ruote and the web application.

      At the core, it only means calling the reply method of the engine with the updated workitem (the workitem contains enough info to tell the engine which process instance to resume).

      Cheers,

      John Mettraux

      January 21, 2010 at 1:09 am

      • Hello John,
        Thank you for your answer. Actually I tried to play with it a little and I got it working using a custom participant that did not reply_to_engine then I used a receiver to receive the workitem in another thread.

        I’m trying to implement a task system where the workflow hangs until a user completes a given task. I would show the task through a web app. I don’t know if you’re familiar with Jboss JBPM but the idea is to have it behave a little like the tasks in JBPM.

        Karl Robertson

        January 21, 2010 at 4:03 pm

      • Hello Karl,

        maybe we should move this discussion to ruote’s mailing list :

        http://groups.google.com/group/openwferu-users

        It’s way better than wordpress commenting system.

        Your custom participant + receiver solution sounds great. It says that you understand ruote.

        I’m a bit familiar with jBPM though I’m not a fan of their approach. 1 to 1 matching between tasks and participants is an excellent approach.

        Cheers,

        John Mettraux

        January 22, 2010 at 2:51 am


Comments are closed.

%d bloggers like this: