Nested futures with batchtools

Author

Galen Holt

Sometimes it makes sense to just send all parallel jobs to single cpus each. But there are other situations, like when we have some high-level parallelization over parameters, followed by additional parallelization over something else, where we can take advantage of the structure of HPC clusters to send biggish jobs to a node, and then further parallelize within it onto CPUS. Which is faster is going to be highly context-dependent, I think, but let’s figure out how and then set up some scripts that can be modified for speed testing.

I’ll use the parallel inner and outer functions developed in testing nested paralellism, but modified as in most of the slurm_r_tests repo to just return PIDs and other diagnostics so we can see what resources get used.

The approach here is learning a lot from this github issue.

Creating a nested plan

Unnested template

If I was using a plan like this to assign jobs to CPUS

plan(tweak(batchtools_slurm,
           template = "batchtools.slurm.tmpl",
           resources = list(time = 5,
                            ntasks.per.node = 1, 
                            mem = 1000,
                            job.name = 'NewName')))

Nested with list()

I now need to do two things- wrap it in a list with a second plan type, and ask for more tasks per node. (It should also work to do this with a focus on tasks and cpus per tasks), but that should work similarly.

So, we can wrap that in a list

plan(list(tweak(batchtools_slurm,
                template = "batchtools.slurm.tmpl",
                resources = list(time = 5,
                                 ntasks.per.node = 12, 
                                 mem = 1000,
                                 job.name = 'NewName')),
          multicore))

Nested functions to return resource use

And set up nested functions that return the outer and inner PIDs, as well as some other stuff:

inner_par <- function(inner_size, outer_it, outer_pid) {
  inner_out <- foreach(j = 1:inner_size,
                       .combine = bind_rows) %dorng% {
                         thisproc <- tibble(all_job_nodes = paste(Sys.getenv("SLURM_JOB_NODELIST"),
                                                                  collapse = ","),
                                            node = Sys.getenv("SLURMD_NODENAME"),
                                            loop = "inner",
                                            outer_iteration = outer_it,
                                            outer_pid = outer_pid,
                                            inner_iteration = j, 
                                            inner_pid = Sys.getpid(),
                                            taskid = Sys.getenv("SLURM_LOCALID"),
                                            cpus_avail = Sys.getenv("SLURM_JOB_CPUS_PER_NODE"))
                         
                       }
  return(inner_out)
}

# The outer loop calls the inner one
outer_par <- function(outer_size, inner_size) {
  outer_out <- foreach(i = 1:outer_size,
                       .combine = bind_rows) %dorng% {
                         
                         # do some stupid work so this isn't trivially nested
                         a <- 1
                         b <- 1
                         d <- a+b
                         # Now iterate over the values in c to do somethign else
                         inner_out <- inner_par(inner_size = inner_size,
                                                outer_it = i, outer_pid = Sys.getpid())
                         
                         inner_out
                       }
  
  return(outer_out)
}

And it WORKS!--

I did this in nested_plan.R in the slurm_r_tests repo, and running it for 25 loops in each of the nested functions shows that we’re getting and using 12 inner PIDS per outer PID

## Nodes and pids simple
# A tibble: 25 × 2
   outer_pid n_inner
       <int>   <int>
 1    549832      12
 2    549960      12
 3    550097      12
 4    550238      12
 5   1507878      12
 6   1508006      12
 7   1508130      12
 8   1508254      12
 9   1508380      12
10   1508502      12
11   1508624      12
12   3494670      12
13   3494799      12
14   3494921      12
15   3495044      12
16   3495168      12
17   3495292      12
18   3495415      12
19   4189058      12
20   4189200      12
21   4189329      12
22   4189455      12
23   4189580      12
24   4189702      12
25   4189829      12

For more detail, run the code in the slurm_r_tests repo. But it shows that we get 25 nodes and 12 cores each (because of ntasks.per.node = 12 in resources), each of which gets used 2-3 times, which makes sense for a 25 outer x 25 inner looping.

Controlling chunks and workers

I’m not going to bother with trying to figure out array jobs (sounds like not easy or supported, and the batchtools framework basically works by spitting out sbatch calls instead of arrays. That’s not ideal, but it’s how it works at least for now. .

It probably is worth addressing chunking. One reason is just to not overload the cluster- if I ask for a million batch futures, that’s not going to make anyone happy, and it’s not efficient anyway. It looks like we can set this with a workers argument (which has default 100). That means however many futures we ask for, they get chunked into workers chunks, and that many jobs get submitted. So, we also might want to modify this if we have a job that naturally wants to be nested, and we know we want a node per outer loop, then we could say workers = OUTER_LOOP_LENGTH.

As a test, we can modify the plan above to have workers = 5 , which should only grab 5 nodes, and send 5 of the 25 outer loops to each of them. I’m going to up the time.

plan(list(tweak(batchtools_slurm,
                workers = 5,
                template = "batchtools.slurm.tmpl",
                resources = list(time = 15,
                                 ntasks.per.node = 12, 
                                 mem = 1000,
                                 job.name = 'NewName')),
          multicore))

And that does what it’s supposed to- keep everything on 5 nodes

## Nodes and pids simple
# A tibble: 5 × 2
  outer_pid n_inner
      <int>   <int>
1    552072      60
2   1510060      60
3   3496848      60
4   4191978      60
5   4192353      60