Skip to content

Conversation

@JelleAalbers
Copy link
Contributor

@JelleAalbers JelleAalbers commented Oct 22, 2016

This adds distributed processing capabilities to pax. The main motivation is parallizing the event builder to achieve a higher throughput of events, but we can probably make broader use of it.

Introduction

Pax multiprocessing works like this:

pax_multiprocessing

Each blue box is a pax process. Arrows indicate the flow of events (which actually goes in blocks, usually of 10 events apiece) and green circles indicate queues in which events wait until we are ready to work on them.

As you can see, an "input pax" produces events and puts them on a processing queue. Several workers fetch these events, process them, and put them on an output queue. Finally, an "output pax" fetches from this queue, puts them back in the original order, and writes them to a file.

You might call such a series of paxes in communication with each other via queues a chain. Other chain layouts are imaginable (e.g. several paxes simulating waveforms that are combined into one dataset by an output pax), but this is the most important one. Input/ouput operations are often intrinsically serial (e.g. writing to file) or very hard to parallelize (triggering a batch of data), but most of the CPU-intensive work can easily be parallelized, as each event is independent.

Inside pax, access to the queues is handled by the input plugin PullFromQueue and the output plugin PushToQueue. (The pax core used to handle this itself, but it turned into a big mess.) Worker paxes use both, input paxes only use PushToQueue (and some input plugin, e.g. ReadZipped or the trigger) and output paxes only PullFomQueue (and some output plyugin, e.g. WriteROOTClass or WriteZipped).

Distributed multiprocessing

There are thee challenges in multiprocessing:

  • How do the parts of the chain figure out when they are done, so they can shut down?
  • How do we get the events out in the order we put them in?
  • How do we make sure a crash of one of the paxes does not cause an infinite hang?

When we wrote the pax multiprocessing code, we assumed the processes would all run locally. We addressed these challenges by a master process that periodically polls all the child processes and a global status variable maintained in shared memory (I won't go into more details of the current implementation, see #172 and #298). For remote processing we must revisit this; I'll discuss how below.

This is the distributed multiprocessing layout introduced here:

pax_multiprocessing_distibuted

There are two new concepts here:

  • When multiprocessing (locally or remotely) pax processes run inside a host process indicated by the gray boxes. These start individual paxes and watch if any of them crash (and, if necessary, the host process can terminate a pax see below).
  • The message broker indicated in red is an external application (RabbitMQ Server) that serves remote queues and "fanouts" (queues that deliver to multiple endpoints). Paxes in the same chain use these remote queues to pass events to each other, and all pax host processes listen to a special channel to receive starup requests and share news of crashes with each other via a fanout channel.

When you start pax with --cpus 10, you activate local multiprocessing (pax.parallel.local_multiprocessing). This starts a host process to do "old-style" pax multiprocessing using shared-memory queues. There is no message broker involved.

When you start pax with --cpus 10 --remote, you activate remote multiprocessing pax.parallel.remote_multiprocessing, shown in the left of the figure. Locally, you will host two paxes -- one input, one output. These are configured to put their events in / get their events from two newly made queues on the message broker. Simultaneously, 10 requests to start several worker paxes are sent to a "startup watch" queue on the message broker.

The paxmaker script is a host process (small gray boxes on the right) that fetches these startup requests from the message broker. When it gets one, it starts a pax with the indicated configuration. In remote multiprocessing, this will start the workers, which are again configured to use the right queues on the message broker for their events. You could also use a listening paxmaker like a batch queue, e.g. to remotely start a pax to plot a few 1000 waveforms you want to look at later.

I've here drawn a single pax chain using the message broker, but running multiple independent chains with the same message broker and the same set of paxmakers is fully supported. The paxmaker processes can then end up hosting paxes belonging to different chains. There will be one crash fanout and startup channel shared by all (since it is the host processes, not the paxes themselves, that communicate with this).

When are we done?

When the source of events in the input pax has dried up, it sends a message NO_MORE_EVENTS to the processing queue. A worker process receiving this message knows it can end, but before it does so, it will pass the NO_MORE_EVENTS message back up the processing queue (not the output queue!) so other workers will receive it. (when all of the workers have terminated, there will actually still be this one NO_MORE_EVENTS message on the queue)

For the output pax, things are more complicated. The workers know when they themselves are done (when they ask for the next event block but instead see a no-more-events message), but not when all of them are done. Other workers may still be busy processing event blocks.

The solution I chose here is to have the workers push a "register" message on the output queue when they start. When the worker is done, it passes an "unregisters" message. The output plugin knows it can end when the last worker has unregistered and there are no more events left for it to write. The output plugin always checks for new register/unregister messages, not just at startup, since workers may have very different startup times.

Ordering the events

The output worker usually receives blocks out of order (as they don't all take the same time to process) so it needs some way to re-order the blocks. This is not trivial:

  • We can't wait until we've received all blocks, sort them, then write them in order. Even if the RAM could fit all events, it is inefficient to start writing only when everything else is done (as many PhD students will attest).
  • Pax supports user-defined event selections (--event 0 2 7 38 ...), which means we can't use schemes relying on predicting what event numbers the next block will have (e.g. start writing the block with event 0, I know blocks are 10 events long, so the next block I could write will have event 10, etc.).

The solution I chose is to tag each block with an incremental "block id" when it is created. All events in the block are tagged with the same id (in the new field Event.block_id) so we don't lose the information when we pass the events to other plugins. With this id, the output worker knows which of the blocks it has received is the next one to write, or if that block hasn't arrived yet.

Most of this was already implemented before. The only change is annotating the events themselves with the block id, whereas previously we used a shortcut where the pax core keeps track of the block id.

Watching for crashes

Life is tough; things break. Ideally you want them to crash clearly and not just cause an infinite hang. You also don't want to ssh to all for every crash to figure out what happened and get things back up. I included two levels of protection for this:

  • Each pax process is started with an id which marks to which "pax chain" it belongs. When a pax process crashes, its host will notify the other host processes connected to the message broker with a message that includes the id and the exception traceback. Those other host processes will print out the traceback of the original crash, then then terminate (SIGTERM) all paxes with the same id. The host processes do NOT crash themselves -- nor do they terminate paxes from other chains.
  • This does not work if a host process itself dies, or worse, the machine it's running on drops from the network. To prevent hangs in this case, we need timeouts:
    • Paxes further down the chain may hang waiting for events. To prevent infinite hangs, pax will terminate with a custom timeout exception if it's been waiting for events for more than 120 seconds.
    • In some cases paxes earlier in the chain can also hang: they may be waiting to fill a queue that has exceeded its maximum size, which the crashed/disconnected pax was supposed to empty. Here, too, if the wait lasts longer than 120 seconds, the pax raises a custom timeout exception.
    • Finally, if events keep coming, but one paxmaker has disappeared, the output pax will accumulate a large pile of blocks while it's waiting for waiting for the next block in order that should have come from the crashed paxmaker. If the number of blocks laying around like this in the output pax's memory exceeds a threshold (I now have it at 1000 10-event blocks), a custom exception is raised.

Again, the host processes themselves stay alive. The timeout exceptions get propagated to other pax hosts connected to the broker and will terminate the other paxes in the chain

ROOT compilation lockfiles

I changed the lock file handling for ROOT class compilation to support multiple processes trying to compile the class in the same directory. In case you want to know the details:

  • When you start two compilations of the same class in one folder weird things happen.
  • Previously, when multiprocessing, one process was designated by its id to be the "chosen one" to compile the class. It would create a lock file and remove it to let the other processes know they can load the library without triggering a conflicting compilation.
  • This won't work anymore in remote multiprocessing with multiple pax chains: both can have a "chosen one" process that wants to compile. The same issue occurs with parallel minitree making in hax occasionally. If we're unlucky, the processes both see there is no lock file, then right after the other create the lockfile and start conflicting compilations. If we're lucky, one process makes the lock file, then the "chosen one" from the other process will crash complaining that an old lock file is present.
  • Now, each process that wants to load a root class will:
    • Check for a lock file. If it exists, wait until it is removed and load the class.
    • If it doesn't exist, create a lock file, then sleep 1 second.
    • Check if anyone else has overwritten our lock file (because they checked for existence of the lock file in between our check and us creating the lock file). If so, they own the compilation now, and we'll wait.
    • If nobody has overwritten the lock file, we own the compilation, and start doing it. We remove the root file when we're done.

Future directions

The main feature currently missing is propagation of log messages. This doesn't matter for the event builder, where all of the action happens in the input plugin, and the rest is dumb but CPU-intensive data fetching and transcoding. If we want to use distributed processing for... well, actual processing, it would be nice to get the log messages in one place as opposed to several STDOUTs of remote machines. Perhaps it would be best to tag along the log messages with the events that generated them?

We might want to add a feature that has more workers startup when the processing queue is full. This will just require a small change to remote_multiprocessing.

Another small feature that would be good to add is auto-breaking of a ROOT compilation lock that has lingered due to a segfault in a previous compilation. Previously pax would crash if this happens, forcing people to manually remove the lock -- now it is actually worse, as it will hang forever waiting for the lock file to be removed. Breaking the lock if it is >1 or 2 minutes old will resolve this. I added breaking of locks more than 90 seconds old. The compilation should not take that long, and people will want to retry it rather soon after it crashes.

It would be interesting to figure out how RabbitMQ actually handles messages: does it store the actual content on the broker machine, or does it just store a reference, and when a worker fetches it, tells that worker to talk to whoever has the content? The latter would probably be preferable for the event builder, though maybe it will work fine either way.

And of course we need to actually test this on the DAQ. ;-) I did include a few unit tests for the new queue plugins and tested the remote functionality manually.

Please do not pull master in yet as it's currently failing (@tunnell will clean this up soon).

@tunnell
Copy link
Member

tunnell commented Oct 25, 2016

We can merge after testing on the DAQ. I'm worried that the RabbitMQ won't scale since things like Celery just use it as a message backend, then use Redis/Mongo as a data backend. If it runs on DAQ, then I'm sure fine.

Revisit ROOT lockfile crap if ever has a problem since you're reinventing lockfiles.

@tunnell
Copy link
Member

tunnell commented Oct 25, 2016

I'll review. Want to go through for hour on Monday? Will fix master too.

Sent from phone

On Oct 22, 2016, at 6:14 PM, Jelle Aalbers [email protected] wrote:

This adds distributed processing capabilities to pax. The main motivation is parallizing the event builder to achieve a higher throughput of events, but we can probably make broader use of it.

Introduction

Pax multiprocessing works like this:

Each blue box is a pax process. Arrows indicate the flow of events (which actually goes in blocks, usually of 10 events apiece) and green circles indicate queues in which events wait until we are ready to work on them.

As you can see, an "input pax" produces events and puts them on a processing queue. Several workers fetch these events, process them, and put them on an output queue. Finally, an "output pax" fetches from this queue, puts them back in the original order, and writes them to a file.

You might call such a series of paxes in communication with each other via queues a chain. Other chain layouts are imaginable (e.g. several paxes simulating waveforms that are combined into one dataset by an output pax), but this is the most important one. Input/ouput operations are often intrinsically serial (e.g. writing to file) or very hard to parallelize (triggering a batch of data), but most of the CPU-intensive work can easily be parallelized, as each event is independent.

Inside pax, access to the queues is handled by the input plugin PullFromQueue and the output plugin PushToQueue. (The pax core used to handle this itself, but it turned into a big mess.) Worker paxes use both, input paxes only use PushToQueue (and some input plugin, e.g. ReadZipped or the trigger) and output paxes only PullFomQueue (and some output plyugin, e.g. WriteROOTClass or WriteZipped).

Distributed multiprocessing

There are thee challenges in multiprocessing:

How do the parts of the chain figure out when they are done, so they can shut down?
How do we get the events out in the order we put them in?
How do we make sure a crash of one of the paxes does not cause an infinite hang?
When we wrote the pax multiprocessing code, we assumed the processes would all run locally, and solved these by a master process that periodically polls all the child processes, and a global status variable maintained in shared memory (I won't go into more details of the current implementation, see #172 and #298). For remote processing these problems must be addressed again; I'll discuss below how this is done.

This is the distributed multiprocessing layout introduced here:

There are two new concepts here:

When multiprocessing (locally or remotely) pax processes run inside a host process indicated by the gray boxes. These start individual paxes and watch if any of them crash (and, if necessary, the host process can terminate a pax see below).
The message broker indicated in red is an external application (RabbitMQ Server) that serves remote queues and "fanouts" (queues that deliver to multiple endpoints). Paxes in the same chain use these remote queues to pass events to each other, and all pax host processes listen to a special channel to receive starup requests and share news of crashes with each other via a fanout channel.
When you start pax with --cpus 10, you activate local multiprocessing (pax.parallel.local_multiprocessing). This starts a host process to do "old-style" pax multiprocessing using shared-memory queues. There is no message broker involved.

When you start pax with --cpus 10 --remote, you activate remote multiprocessing pax.parallel.remote_multiprocessing, shown in the left of the figure. Locally, you will host two paxes -- one input, one output. These are configured to put their events in / get their events from two newly made queues on the message broker. Simultaneously, 10 requests to start several worker paxes are sent to a "startup watch" queue on the message broker.

The paxmaker script is a host process (small gray boxes on the right) that fetches these startup requests from the message broker. When it gets one, it starts a pax with the indicated configuration. In remote multiprocessing, this will start the workers, which are again configured to use the right queues on the message broker for their events. You could also use a listening paxmaker like a batch queue, e.g. to remotely start a pax to plot a few 1000 waveforms you want to look at later.

I've here drawn a single pax chain using the message broker, but running multiple independent chains with the same message broker and the same set of paxmakers is fully supported. The paxmaker processes can then end up hosting paxes belonging to different chains. There will be one crash fanout and startup channel shared by all (since it is the host processes, not the paxes themselves, that communicate with this).

When are we done?

When the source of events in the input pax has dried up, it sends a message NO_MORE_EVENTS to the processing queue. A worker process receiving this message knows it can end, but before it does so, it will pass the NO_MORE_EVENTS message back up the processing queue (not the output queue!) so other workers will receive it. (when all of the workers have terminated, there will actually still be this one NO_MORE_EVENTS message on the queue)

For the output pax, things are more complicated. The workers know when they themselves are done (when they ask for the next event block but instead see a no-more-events message), but not when all of them are done. Other workers may still be busy processing event blocks.

The solution I chose here is to have the workers push a "register" message on the output queue when they start. When the worker is done, it passes an "unregisters" message. The output plugin knows it can end when the last worker has unregistered and there are no more events left for it to write. It's ok for workers to register at any point during the processing (not just when starting up).

Ordering the events

The output worker usually receives blocks out of order (as they don't all take the same time to process) so it needs some way to re-order the blocks. This is not trivial:

We can't wait until we've received all blocks, sort them, then write them in order. Even if the RAM could fit all events, it is inefficient to start writing only when everything else is done (as many PhD students will attest).
Pax supports user-defined event selections (--events 0 2 7 38 ...), which means we can't use schemes relying on predicting what event numbers the next block will have (e.g. start writing the block with event 0, I know blocks are 10 events long, so the next block I could write will have event 10, etc.).
The solution I chose is to tag each block with an incremental "block id" when it is created. All events in the block are tagged with the same id (in the new field Event.block_id) so we don't lose the information when we pass the events to other plugins. With this id, the output worker knows which of the blocks it has received is the next one to write, or if that block hasn't arrived yet.

Most of this was already implemented before. The only change is annotating the events themselves with the block id, whereas previously we used a shortcut where the pax core keeps track of the block id.

Watching for crashes

Life is tough; things break. Ideally you want them to crash clearly and not just cause an infinite hang. You also don't want to ssh to all for every crash to figure out what happened and get things back up. I included two levels of protection for this:

Each pax process is started with an id which marks to which "pax chain" it belongs. When a pax process crashes, its host will notify the other host processes connected to the message broker with a message that includes the id and the exception traceback. Those other host processes will print out the traceback of the original crash, then then terminate (SIGTERM) all paxes with the same id. The host processes do NOT crash themselves -- nor do they terminate paxes from other chains.
This does not work if a host process itself dies, or worse, the machine it's running on drops from the network. To prevent hangs in this case, we need timeouts:
Paxes further down the chain may hang waiting for events. To prevent infinite hangs, pax will terminate with a custom timeout exception if it's been waiting for events for more than 120 seconds.
In some cases paxes earlier in the chain can also hang: they may be waiting to fill a queue that has exceeded its maximum size, which the crashed/disconnected pax was supposed to empty. Here, too, if the wait lasts longer than 120 seconds, the pax raises a custom timeout exception.
Finally, if events keep coming, but one paxmaker has disappeared, the output pax will accumulate a large pile of blocks while it's waiting for waiting for the next block in order that should have come from the crashed paxmaker. If the number of blocks laying around like this in the output pax's memory exceeds a threshold (I now have it at 1000 10-event blocks), a custom exception is raised.
Again, the host processes themselves stay alive. The timeout exceptions get propagated to other pax hosts connected to the broker and will terminate the other paxes in the chain

ROOT compilation lockfiles

I changed the lock file handling for ROOT class compilation to support multiple processes trying to compile the class in the same directory. In case you want to know the details:

When you start two compilations of the same class in one folder weird things happen.
Previously, when multiprocessing, one process was designated by its id to be the "chosen one" to compile the class. It would create a lock file and remove it to let the other processes know they can load the library without triggering a conflicting compilation.
This won't work anymore in remote multiprocessing with multiple pax chains: both can have a "chosen one" process that starts compiling at the same time. If we're unlucky, they both see there is no lock file, then right after the other create the lockfile and start conflicting compilations.
Now, each process that wants to load a root class will:
Check for a lock file. If it exists, wait until it is removed and load the class.
If it doesn't exist, create a lock file, then sleep 1 second.
Check if anyone else has overwritten our lock file (because they checked for existence of the lock file in between our check and us creating the lock file). If so, they own the compilation now, and we'll wait.
If nobody has overwritten the lock file, we own the compilation, and start doing it. We remove the root file when we're done.
Future directions

The main feature currently missing is propagation of log messages. This doesn't matter for the event builder, where all of the action happens in the input plugin, and the rest is dumb but CPU-intensive data fetching and transcoding. If we want to use distributed processing for... well, actual processing, it would be nice to get the log messages in one place as opposed to several STDOUTs of remote machines. Perhaps it would be best to tag along the log messages with the events that generated them?

Another small feature that would be good to add is auto-breaking of a ROOT compilation lock that has lingered due to a segfault in a previous compilation. Previously pax would crash if this happens, forcing people to manually remove the lock -- now it is actually worse, as it will hang forever waiting for the lock file to be removed. Breaking the lock if it is >1 or 2 minutes old will resolve this.

It would be interesting to figure out how RabbitMQ actually handles messages: does it store the actual content on the broker machine, or does it just store a reference, and when a worker fetches it, tells that worker to talk to whoever has the content? The latter would probably be preferable for the event builder, though maybe it will work fine either way.

And of course we need to actually test this on the DAQ. ;-) I did include a few unit tests for the new queue plugins and tested the remote functionality manually.

Please do not pull master in yet as it's currently failing (@tunnell will clean this up soon).

You can view, comment on, or merge this pull request online at:

#439

Commit Summary

Start queue plugin implementation
Rename queue plugin file
Continue queue plugin development
Continue queue plugins development
More queue goodness (getting there...)
Local queues now functional
Add recordclass requirement
Try to fix tests
Fix preserve-id bug, better many-to-one support
Start remote queues code
Fix override bug
Paxmaker now functional
Remote multiprocessing functional :-)
Proper queue closing, add rabbitpy to requirements
Exception propagation, status lines, eb support
Add timeout conditions
Fix cpus argument
Also crash if heap size too large
File Changes

M bin/event-builder (20)
M bin/paxer (29)
A bin/paxmaker (84)
M pax/FolderIO.py (16)
M pax/config/eventbuilder.ini (10)
M pax/configuration.py (65)
M pax/core.py (433)
M pax/datastructure.py (11)
M pax/exceptions.py (8)
A pax/parallel.py (327)
M pax/plugin.py (2)
M pax/plugins/io/MongoDB.py (3)
A pax/plugins/io/Queues.py (214)
M pax/plugins/io/ROOTClass.py (60)
M pax/plugins/io/XED.py (3)
M pax/plugins/io/Zip.py (2)
M pax/utils.py (13)
M requirements.txt (3)
M setup.py (2)
A tests/test_multiprocessing.py (147)
M tests/test_pax.py (26)
Patch Links:

https://github.com/XENON1T/pax/pull/439.patch
https://github.com/XENON1T/pax/pull/439.diff

You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

@JelleAalbers JelleAalbers merged commit 4eee0ca into master Oct 26, 2016
@JelleAalbers JelleAalbers deleted the new_queues branch October 26, 2016 09:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants