The main aim of this R Markdown workbook is to demonstrate different types of multithreading analyzing a large data set in R: multithreaded matrix computations and multithreaded R code using the parallel package. These computations (multithreaded or otherwise) are used to generate estimates about the genetic basis of complex traits—specifically, the proportion of variance in these traits that can be explained by genetic factors.

Important: Some of the R code below will not work in Windows.

Initial setup

  1. Install the cfwlab package.

  2. Download source file setblas.c and build the shared object file in the same directory as your R working directory: R CMD SHLIB setblas.c

  3. If you logged in to a compute cluster, request a compute node and request resources for your computing. For example, on the RCC compute cluster, I requested a Broadwell compute node with 40 GB of memory and 8 cores (CPUs): sinteractive --partition=broadwl --cpus-per-task=8 --mem=40G

  4. Set the number of threads used by the BLAS library installed with R. For example, if you have installed R with OpenBLAS, you can set the number of threads to 2 with this command in the bash shell: export OPENBLAS_NUM_THREADS=2.

  5. Start up R or RStudio.

Setting up your R environment

Load the parallel package, cfwlab package, and the “setblas” function.

library(parallel)
library(cfwlab)
dyn.load("setblas.so")

Initialize the random number generator.

set.seed(1)

Function definitions

Here we define a few functions that will be useful below in our experiments. You don’t have to understand the code—just make sure that these functions are defined in your environment to run the examples below.

This function distributes the elements of x evenly (or as evenly as possible) into k list elements.

distribute <- function (x, k)
  split(x,rep(1:k,length.out = length(x)))

This function replicates vector x to create an n x m matrix, where m = length(x).

rep.row <- function (x, n)
  matrix(x,n,length(x),byrow = TRUE)

This function sets the number of threads used by OpenBLAS.

set.blas.num.threads <- function (n) {
  .Call("set_blas_Call",n = as.integer(n))
  return(n)
}

This function takes as input an array of unnormalized log-importance weights and returns normalized importance weights such that the sum of the normalized importance weights is equal to 1. We guard against underflow or overflow by adjusting the log-importance weights so that the largest importance weight is 1.

normalizelogweights <- function (logw) {
  c <- max(logw)
  w <- exp(logw - c)
  return(w / sum(w))
}

This function computes the marginal log-likelihood the regression model of Y given X assuming that the prior variance of the regression coefficients is sa. Here K is the “kinship” matrix, K = tcrossprod(X)/p. Also, H is the covariance matrix of Y divided by residual variance.

compute.log.weight <- function (K, y, sa, use.backsolve = TRUE) {
  H <- diag(n) + sa*K
  R <- tryCatch(chol(H),error = function(e) FALSE)
  if (is.matrix(R)) {
    if (use.backsolve)
      x <- backsolve(R,forwardsolve(t(R),y))
    else
      x <- solve(H,y)
    logw <- (-determinant(sum(y*x)*H,logarithm = TRUE)$modulus/2)
  } else
    logw <- 0
  return(logw)
}

This function computes the marginal log-likelihood for multiple settings of the prior variance parameter.

compute.log.weights <- function (K, y, sa, use.backsolve = TRUE)
  sapply(as.list(sa),function (x) compute.log.weight(K,y,x,use.backsolve))

This is a multicore variant of the above function implemented using the “mclapply” function. Input argument nc specifies the number of cores (CPUs) to use for the computation. Note that the mclapply relies on forking and therefore will not work on a computer running Windows.

compute.log.weights.multicore <- function (K, y, sa, nc = 2,
                                           use.backsolve = TRUE) {
  samples <- distribute(1:length(sa),nc)
  logw    <- mclapply(samples,
               function (i) compute.log.weights(K,y,sa[i],use.backsolve),
               mc.cores = nc)
  logw <- do.call(c,logw)
  logw[unlist(samples)] <- logw
  return(logw)
}

Load data

Load the phenotype and genotype data. Here, we analyze soleus muscle weight, but other phenotypes are available in this data set.

data(cfw.pheno)
data(cfw.geno)
X <- cfw.geno
y <- cfw.pheno[["soleus"]]

Remove the rows containing missing data.

rows <- which(!is.na(y))
y    <- y[rows]
X    <- X[rows,]

Preprocess data

Center y and the columns of X.

n <- nrow(X)
p <- ncol(X)
X <- X - rep.row(colMeans(X),n)
y <- y - mean(y)

Compute the kinship matrix. This computation may take a minute or two.

K <- tcrossprod(X)/p

Estimate PVE

In this section, we will compute an importance sampling estimate of the proportion of variance in the quantitative trait explained by the available genotypes (abbreviated as “PVE”). This is a numerically intensive operation because it involves factorizing a large, symmetric matrix separately for each Monte Carlo sample. We will explore how increasing the number of threads available for the matrix computations (BLAS) and for the Monte Carlo computations improves the computation time.

First, draw samples of the PVE estimate from the proposal distribution, which is uniform on [0,1]. Here, we compute the importance sampling estimate using 1,000 samples, but you can choose a larger or smaller number of samples.

ns <- 1000
h  <- runif(ns)

For each PVE setting, get the prior variance of the regression coefficients assuming a fully “polygenic” model.

sx <- sum(apply(X,2,sd)^2)
sa <- p*h/(1-h)/sx

Without parallel computing

Now we reach the numerically intensive part. First, let’s compute the importance weights without taking advantage of any multithreaded or parallel computing capabilities. Since I am using OpenBLAS, I can set the number of matrix operation threads using set.blas.num.threads, but you may need to set it a different way.

set.blas.num.threads(1)
# [1] 1
system.time(logw <- compute.log.weights(K,y,sa,use.backsolve = TRUE))
#    user  system elapsed 
#  60.692   2.236  62.912

The “elapsed” time tells us the amount of time it took to compute the importance weights.

Also, here I am using a “trick” to improve the speed of one of the matrix operations, and you can set use.backsolve = FALSE to see how much of an improvement this trick gives you.

Multithreaded matrix operations

On my compute node, I have 8 CPUs available, so let’s speed up the matrix computations by using 8 CPUs:

set.blas.num.threads(8)
# [1] 8
system.time(logw <- compute.log.weights(K,y,sa))
#    user  system elapsed 
#   102.3   123.2    28.3

We’ve obtained a substantial improvement in computation time without any change to our code. But it is far from the “ideal” of 8x improvement. That is because there is a significant amount of overhead in parallelizing these computations.

Exercise: Experiment with different numbers of threads to compare performance gains.

Multithreading using “mclapply”

Next, let’s try a different parallelization scheme, in which we compute batches of importance weights in parallel. The last input argument specifies the number of threads; in my case, 8 seems ideal because I have 8 CPUs available to use:

set.blas.num.threads(1)
# [1] 1
system.time(logw <- compute.log.weights.multicore(K,y,sa,nc = 8))
#    user  system elapsed 
#   84.61   34.31   11.44

Computing the importance weights in parallel—with no multithreading for the matrix (BLAS) operations—seems to be a more efficient use of the computing resources.

Note of caution: This type of multithreading will increase the memory requirements for our computation because we have to store the intermediate products of our matrix operations (the Cholesky factors) simultaneously for all the calls to compute.log.weight being executed in parallel. So we need to monitor memory usage (e.g., using htop) and make sure that we are not overloading the memory. If we overload the memory, there is a risk that the computation will stall, or the program will crash.

We can of course combine the two types of multithreading in order to balance compute time and memory usage, e.g.:

set.blas.num.threads(2)
# [1] 2
system.time(logw <- compute.log.weights.multicore(K,y,sa,nc = 4))
#    user  system elapsed 
#   63.39   23.68   15.39

Exercise: Experiment with different allocations of the threads to the matrix computations and the mclapply call.

Finally, once we have done the hard work of computing the importance weights, we can quickly compute a Monte Carlo estimate of the posterior mean PVE:

w <- normalizelogweights(logw)
sum(w * h)
# [1] 0.2595

Session information

This is the operating system, version of R and the packages that were used to generate these results given above.

sessionInfo()
# R version 3.4.1 (2017-06-30)
# Platform: x86_64-pc-linux-gnu (64-bit)
# Running under: Scientific Linux 7.2 (Nitrogen)
# 
# Matrix products: default
# BLAS/LAPACK: /software/openblas-0.2.19-el7-x86_64/lib/libopenblas_haswellp-r0.2.19.so
# 
# locale:
# [1] C
# 
# attached base packages:
# [1] parallel  stats     graphics  grDevices utils     datasets  methods  
# [8] base     
# 
# other attached packages:
# [1] cfwlab_1.1-0  knitr_1.16    rmarkdown_1.6
# 
# loaded via a namespace (and not attached):
#  [1] compiler_3.4.1  backports_1.1.0 magrittr_1.5    rprojroot_1.2  
#  [5] tools_3.4.1     htmltools_0.3.6 yaml_2.1.14     Rcpp_0.12.12   
#  [9] stringi_1.1.5   stringr_1.2.0   digest_0.6.12   evaluate_0.10.1

Other notes

To generate a webpage from this document, I run the following commands in R:

library(rmarkdown)
render("demo.matrix.multicore.Rmd")