A Queue
executes tasks concurrently using multiple workers.
Details
The Queue
class is primary interface provided by the queue package. It
allows users to execute an arbitrary collection of tasks in parallel across
multiple R sessions, managed automatically in the background. Once a new
queue is initialised, tasks can be added to the queue using the add()
method. Once all tasks are added, they are executed in parallel
by calling the run()
method. When completed, run()
returns a
tibble that contains the results for all tasks, and some additional
metadata besides.
Internally, a Queue
uses a TaskList
object as its data store and a
WorkerPool
object to execute the tasks in parallel. These objects can be
accessed by calling the get_tasks()
method and the get_workers()
methods.
Usually you would not need to do this, but occasionally it can be useful
because those objects have some handy methods that allow finer-grained
control (see the documentation for TaskList
and WorkerPool
respectively).
Methods
Method new()
Create a task queue
Usage
Queue$new(workers = 4L)
Method add()
Adds a task to the queue
Usage
Queue$add(fun, args = list(), id = NULL)
Method run()
Execute tasks in parallel using the worker pool, assigning tasks to workers in the same order in which they were added to the queue
Arguments
timelimit
How long (in seconds) should the worker pool wait for a task to complete before terminating the child process and moving onto the next task? (default is 60 seconds, but this is fairly arbitrary)
message
What messages should be reported by the queue while it is running? Options are "none" (no messages), "minimal" (a spinner is shown alongside counts of waiting, running, and completed tasks), and "verbose" (in addition to the spinner, each task is summarized as it completes). Default is "minimal".
interval
How often should the task queue poll the workers to see if they have finished their assigned tasks? Specified in seconds.
shutdown
Should the workers in the pool be shut down (i.e., all R sessions closed) once the tasks are completed. Defaults to
TRUE
.
Returns
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
Queue
, and the following columns:
task_id
A character string specifying the task identifiersworker_id
An integer specifying the worker process ids (pid)state
A character string indicating the status of each task ("created", "waiting", "assigned", "running", or "done")result
A list containing the function outputs, or NULLruntime
Completion time for the task (NA if the task is not done)fun
A list containing the functionsargs
A list containing the arguments passed to each functioncreated
The time at which each task was createdqueued
The time at which each task was added to aQueue
assigned
The time at which each task was assigned to aWorker
started
The time at which aWorker
called each functionfinished
The time at which aWorker
output was returned for the taskcode
The status code returned by the callr R session (integer)message
The message returned by the callr R session (character)stdout
List column containing the contents of stdout during function executionstderr
List column containing the contents of stderr during function executionerror
List column containingNULL
values
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
Examples
queue <- Queue$new(workers = 4L)
wait <- function(x) Sys.sleep(runif(1))
for(i in 1:6) queue$add(wait)
queue$run()
#> ✔ Queue complete: 6 tasks done in 1.7 secs
#> # A tibble: 6 × 17
#> task_id worker_id state result runtime fun args created
#> <chr> <int> <chr> <list> <drtn> <lis> <list> <dttm>
#> 1 task_1 6442 done <NULL> 0.9304695 secs <fn> <list> 2022-12-22 00:51:11
#> 2 task_2 6456 done <NULL> 0.9950504 secs <fn> <list> 2022-12-22 00:51:11
#> 3 task_3 6470 done <NULL> 0.9944496 secs <fn> <list> 2022-12-22 00:51:11
#> 4 task_4 6484 done <NULL> 0.9273577 secs <fn> <list> 2022-12-22 00:51:11
#> 5 task_5 6442 done <NULL> 0.1153209 secs <fn> <list> 2022-12-22 00:51:11
#> 6 task_6 6484 done <NULL> 0.7462354 secs <fn> <list> 2022-12-22 00:51:11
#> # … with 9 more variables: queued <dttm>, assigned <dttm>, started <dttm>,
#> # finished <dttm>, code <int>, message <chr>, stdout <list>, stderr <list>,
#> # error <list>