Sometimes you want to do “everything, everywhere, all at once”. When that happens it’s awfully convenient if you have easy-to-use tools to execute your R code in parallel across multiple R sessions. That’s the goal of the queue package. It provides a clean interface implementing multi-worker task queues in R that doesn’t ask the user to do very much work.
Another parallel computing tool
Anyone familiar with the R ecosystem knows that there are already many good tools for this purpose. The queue package isn’t intended to be a replacement for sophisticated distributed computing tools like futureverse (not even close!). Nor is it intended as a replacement for session management tools like callr, upon on which queue is built. If you find yourself needing flexible, highly performant tools, I recommend both of those options thoroughly.
The reason queue exists is that, well, sometimes it’s nice to have an adorably simple alternative. Not everyone in the R community has the time and expertise to learn how to use fancy, but most of us have a laptop, and that laptop has a bunch of cores that we sometimes forget to use. With that in mind, queue is a deliberately simple tool with a minimum of features.
Also – and I’ll be honest, this is the real reason it exists – I wanted an excuse to make sure I really understood callr and R6 and this felt like as good a side project as any.
Basic usage
The queue package adopts an encapsulated object-oriented programming
style, and uses R6 classes to manage task queues. The primary class in
the package is Queue
. When a new task queue is created it
also initialises a new WorkerPool
, a collection of R
sessions in which tasks will be executed. You can set the number of
workers during initialisation:
Tasks are then pushed to the queue by calling it’s add()
method. A task is a function and a list of arguments. In the example
below, wait()
is a function that sleeps for a specified
length of time and then returns its input. We’ll queue up 10 jobs that
pause for different lengths of time:
We execute the tasks by calling the run()
method:
out <- queue$run(message = "verbose")
#> → Done: task_1 finished in 0.163 secs
#> → Done: task_2 finished in 0.249 secs
#> → Done: task_3 finished in 0.369 secs
#> → Done: task_4 finished in 0.483 secs
#> → Done: task_5 finished in 0.549 secs
#> → Done: task_6 finished in 0.676 secs
#> → Done: task_7 finished in 0.78 secs
#> → Done: task_8 finished in 0.88 secs
#> → Done: task_9 finished in 0.977 secs
#> → Done: task_10 finished in 1.07 secs
#> ✔ Queue complete: 10 tasks done in 2.03 secs
The output is stored in a tibble that contains a fairly detailed representation of everything that happened during the execution of the queue, including time stamps, any messages printed to the R console during the execution of each function, and so on:
out
#> # A tibble: 10 × 17
#> task_id worker_id state result runtime fun args created
#> <chr> <int> <chr> <list> <drtn> <lis> <list> <dttm>
#> 1 task_1 6629 done <dbl> 0.1634… <fn> <named list> 2022-12-22 00:51:19
#> 2 task_2 6643 done <dbl> 0.2488… <fn> <named list> 2022-12-22 00:51:19
#> 3 task_3 6653 done <dbl> 0.3687… <fn> <named list> 2022-12-22 00:51:19
#> 4 task_4 6667 done <dbl> 0.4831… <fn> <named list> 2022-12-22 00:51:19
#> 5 task_5 6629 done <dbl> 0.5486… <fn> <named list> 2022-12-22 00:51:19
#> 6 task_6 6643 done <dbl> 0.6757… <fn> <named list> 2022-12-22 00:51:19
#> 7 task_7 6653 done <dbl> 0.7800… <fn> <named list> 2022-12-22 00:51:19
#> 8 task_8 6667 done <dbl> 0.8799… <fn> <named list> 2022-12-22 00:51:19
#> 9 task_9 6629 done <dbl> 0.9767… <fn> <named list> 2022-12-22 00:51:19
#> 10 task_10 6643 done <dbl> 1.0732… <fn> <named list> 2022-12-22 00:51:19
#> # … with 9 more variables: queued <dttm>, assigned <dttm>, started <dttm>,
#> # finished <dttm>, code <int>, message <chr>, stdout <list>, stderr <list>,
#> # error <list>
The results of the function call are always stored in a list column
called result
because in general there’s no guarantee that
an arbitrary collection of tasks will return results that are consistent
with each other, but in this case they are, so we can check the results
like this:
unlist(out$result)
#> [1] 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
By default, all the worker sessions are shutdown when the queue
completes, so the user doesn’t have to take care of that, but you can
override this and reuse a single WorkerPool
across more
than one Queue
if you want. See the classs documentation
for details.
Implementation detail
From the user point of view Queue
objects are the most
useful part of the package, but internally most of the work is devolved
to other classes. Specifically, the package is built on top of two
abstractions: Task
objects provide the storage class: they
hold a function, its arguments, its output, and a variety of other
metadata. The Worker
class provides the representation of
an external R session and the ability to execute and manage a
Task
using that session. Internally, the
Worker
class relies on callr::r_session.
To allow a Queue
to execute multiple jobs in parallel,
there are two additional classes provided by the queue package: a
TaskList
object is a container that holds multiple
Task
objects and has some methods for working with them,
and similarly a WorkerPool
is a container for multiple
Worker
objects with tools for working with those. A
Queue
is associated with a WorkerPool
and a
TaskList
, and has methods that will return each of these,
should you ever have a need to play around with the internal data
structures.
Crash resistance
The queue package isn’t very sophisticated in detecting sessions that have crashed, but it does have some. For simplicity, let’s define a function that is guaranteed to crash the R session as soon as it is called:
crash <- function(x) .Call("abort")
Now let’s define a queue that has only two workers, but has no less than three tasks that are guaranteed to crash the worker the moment the tasks are started:
queue <- Queue$new(workers = 2)
queue$add(wait, list(x = .1))
queue$add(crash)
queue$add(crash)
queue$add(crash)
queue$add(wait, list(x = .1))
The queue allocates task in a first-in first-out order, so the three “crash tasks” are guaranteed to be allocated before the final “wait task”. Let’s take a look at what happens when the queue runs:
queue$run()
#> ✔ Queue complete: 5 tasks done in 3.79 secs
#> # A tibble: 5 × 17
#> task_id worker_id state result runtime fun args created
#> <chr> <int> <chr> <list> <drtn> <lis> <list> <dttm>
#> 1 task_1 6692 done <dbl> 0.15970… <fn> <named list> 2022-12-22 00:51:22
#> 2 task_2 6705 done <NULL> 2.75545… <fn> <list [0]> 2022-12-22 00:51:22
#> 3 task_3 6692 done <NULL> 2.92846… <fn> <list [0]> 2022-12-22 00:51:22
#> 4 task_4 6724 done <NULL> 0.39293… <fn> <list [0]> 2022-12-22 00:51:22
#> 5 task_5 6738 done <dbl> 0.40610… <fn> <named list> 2022-12-22 00:51:22
#> # … with 9 more variables: queued <dttm>, assigned <dttm>, started <dttm>,
#> # finished <dttm>, code <dbl>, message <chr>, stdout <list>, stderr <list>,
#> # error <list>
It’s a little slower than we’d hope, but it does finish both valid
tasks and returns nothing for the tasks that crashed their R sessions.
What has happened in the background is that the Queue
runs
a simple scheduler that asks the WorkerPool
to check if any
of the R sessions have crashed, and initialises new Worker
objects to replace them if that happens.
There is not even a pretense that any of this has been optimised, but it does work.