A Queue executes tasks concurrently using multiple workers.
Details
The Queue class is primary interface provided by the queue package. It
allows users to execute an arbitrary collection of tasks in parallel across
multiple R sessions, managed automatically in the background. Once a new
queue is initialised, tasks can be added to the queue using the add()
method. Once all tasks are added, they are executed in parallel
by calling the run() method. When completed, run() returns a
tibble that contains the results for all tasks, and some additional
metadata besides.
Internally, a Queue uses a TaskList object as its data store and a
WorkerPool object to execute the tasks in parallel. These objects can be
accessed by calling the get_tasks() method and the get_workers() methods.
Usually you would not need to do this, but occasionally it can be useful
because those objects have some handy methods that allow finer-grained
control (see the documentation for TaskList and WorkerPool respectively).
Methods
Method new()
Create a task queue
Usage
Queue$new(workers = 4L)Method add()
Adds a task to the queue
Usage
Queue$add(fun, args = list(), id = NULL)Method run()
Execute tasks in parallel using the worker pool, assigning tasks to workers in the same order in which they were added to the queue
Arguments
timelimitHow long (in seconds) should the worker pool wait for a task to complete before terminating the child process and moving onto the next task? (default is 60 seconds, but this is fairly arbitrary)
messageWhat messages should be reported by the queue while it is running? Options are "none" (no messages), "minimal" (a spinner is shown alongside counts of waiting, running, and completed tasks), and "verbose" (in addition to the spinner, each task is summarized as it completes). Default is "minimal".
intervalHow often should the task queue poll the workers to see if they have finished their assigned tasks? Specified in seconds.
shutdownShould the workers in the pool be shut down (i.e., all R sessions closed) once the tasks are completed. Defaults to
TRUE.
Returns
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
Queue, and the following columns:
task_idA character string specifying the task identifiersworker_idAn integer specifying the worker process ids (pid)stateA character string indicating the status of each task ("created", "waiting", "assigned", "running", or "done")resultA list containing the function outputs, or NULLruntimeCompletion time for the task (NA if the task is not done)funA list containing the functionsargsA list containing the arguments passed to each functioncreatedThe time at which each task was createdqueuedThe time at which each task was added to aQueueassignedThe time at which each task was assigned to aWorkerstartedThe time at which aWorkercalled each functionfinishedThe time at which aWorkeroutput was returned for the taskcodeThe status code returned by the callr R session (integer)messageThe message returned by the callr R session (character)stdoutList column containing the contents of stdout during function executionstderrList column containing the contents of stderr during function executionerrorList column containingNULLvalues
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
Examples
queue <- Queue$new(workers = 4L)
wait <- function(x) Sys.sleep(runif(1))
for(i in 1:6) queue$add(wait)
queue$run()
#> ✔ Queue complete: 6 tasks done in 1.7 secs
#> # A tibble: 6 × 17
#> task_id worker_id state result runtime fun args created
#> <chr> <int> <chr> <list> <drtn> <lis> <list> <dttm>
#> 1 task_1 6442 done <NULL> 0.9304695 secs <fn> <list> 2022-12-22 00:51:11
#> 2 task_2 6456 done <NULL> 0.9950504 secs <fn> <list> 2022-12-22 00:51:11
#> 3 task_3 6470 done <NULL> 0.9944496 secs <fn> <list> 2022-12-22 00:51:11
#> 4 task_4 6484 done <NULL> 0.9273577 secs <fn> <list> 2022-12-22 00:51:11
#> 5 task_5 6442 done <NULL> 0.1153209 secs <fn> <list> 2022-12-22 00:51:11
#> 6 task_6 6484 done <NULL> 0.7462354 secs <fn> <list> 2022-12-22 00:51:11
#> # … with 9 more variables: queued <dttm>, assigned <dttm>, started <dttm>,
#> # finished <dttm>, code <int>, message <chr>, stdout <list>, stderr <list>,
#> # error <list>