Parallel mapM on Repa arrays

ArraysHaskellParallel ProcessingRepa

Arrays Problem Overview


In my recent work with Gibbs sampling, I've been making great use of the RVar which, in my view, provides a near ideal interface to random number generation. Sadly, I've been unable to make use of Repa due to the inability to use monadic actions in maps.

While clearly monadic maps can't be parallelized in general, it seems to me that RVar may be at least one example of a monad where effects can be safely parallelized (at least in principle; I'm not terribly familiar with the inner workings of RVar). Namely, I want to write something like the following,

drawClass :: Sample -> RVar Class
drawClass = ...

drawClasses :: Array U DIM1 Sample -> RVar (Array U DIM1 Class)
drawClasses samples = A.mapM drawClass samples

where A.mapM would look something like,

mapM :: ParallelMonad m => (a -> m b) -> Array r sh a -> m (Array r sh b)

While clearly how this would work depends crucially on the implementation of RVar and its underlying RandomSource, in principle one would think that this would involve drawing a new random seed for each thread spawned and proceeding as usual.

Intuitively, it seems that this same idea might generalize to some other monads.

So, my question is: Could one construct a class ParallelMonad of monads for which effects can be safely parallelized (presumably inhabited by, at the least, RVar)?

What might it look like? What other monads might inhabit this class? Have others considered the possibility of how this might work in Repa?

Finally, if this notion of parallel monadic actions can't be generalized, does anyone see any nice way to make this work in the specific case of RVar (where it would be very useful)? Giving up RVar for parallelism is a very difficult trade-off.

Arrays Solutions


Solution 1 - Arrays

It's been 7 years since this question has been asked, and it still seems like no-one came up with a good solution to this problem. Repa doesn't have a mapM/traverse like function, even one that could run without parallelization. Moreover, considering the amount of progress there was in the last few years it seems unlikely that it will happen either.

Because of stale state of many array libraries in Haskell and my overall dissatisfaction with their feature sets I've put forth a couple of years of work into an array library massiv, which borrows some concepts from Repa, but takes it to a completely different level. Enough with the intro.

Prior to today, there was three monadic map like functions in massiv (not counting the synonym like functions: imapM, forM et al.):

  • mapM - the usual mapping in an arbitrary Monad. Not parallelizable for obvious reasons and is also a bit slow (along the lines of usual mapM over a list slow)
  • traversePrim - here we are restricted to PrimMonad, which is significantly faster than mapM, but the reason for this is not important for this discussion.
  • mapIO - this one, as name suggests, is restricted to IO (or rather MonadUnliftIO, but that is irrelevant). Because we are in IO we can automatically split array in as many chunks as there are cores and use separate worker threads to map the IO action over each element in those chunks. Unlike pure fmap, which is also parallelizable, we have to be in IO here because of non-determinism of scheduling combined with side effects of our mapping action.

So, once I read this question, I thought to myself that the problem is practically solved in massiv, but no so fast. Random number generators, such as in mwc-random and others in random-fu can't use the same generator across many threads. Which means, that the only piece of the puzzle I was missing was: "drawing a new random seed for each thread spawned and proceeding as usual". In other words, I needed two things:

  • A function that would initialize as many generators as there gonna be worker threads
  • and an abstraction that would seamlessly give the correct generator to the mapping function depending on which thread that the action is running in.

So that is exactly what I did.

First I will give examples using the specially crafted randomArrayWS and initWorkerStates functions, as they are more relevant to the question and later move to the more general monadic map. Here are their type signatures:

randomArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates g -- ^ Use `initWorkerStates` to initialize you per thread generators
  -> Sz ix -- ^ Resulting size of the array
  -> (g -> m e) -- ^ Generate the value using the per thread generator.
  -> m (Array r ix e)
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)

For those who are not familiar with massiv, the Comp argument is a computation strategy to use, notable constructors are:

  • Seq - run computation sequentially, without forking any threads
  • Par - spin up as many threads as there are capabilities and use those to do the work.

I'll use mwc-random package as an example initially and later move to RVarT:

λ> import Data.Massiv.Array
λ> import System.Random.MWC (createSystemRandom, uniformR)
λ> import System.Random.MWC.Distributions (standard)
λ> gens <- initWorkerStates Par (\_ -> createSystemRandom)

Above we initialized a separate generator per thread using system randomness, but we could have just as well used a unique per thread seed by deriving it from the WorkerId argument, which is a mere Int index of the worker. And now we can use those generators to create an array with random values:

λ> randomArrayWS gens (Sz2 2 3) standard :: IO (Array P Ix2 Double)
Array P Par (Sz (2 :. 3))
  [ [ -0.9066144845415213, 0.5264323240310042, -1.320943607597422 ]
  , [ -0.6837929005619592, -0.3041255565826211, 6.53353089112833e-2 ]
  ]

By using Par strategy the scheduler library will split evenly the work of generation among available workers and each worker will use it's own generator, thus making it thread safe. Nothing prevents us from reusing the same WorkerStates arbitrary number of times as long as it is not done concurrently, which otherwise would result in an exception:

λ> randomArrayWS gens (Sz1 10) (uniformR (0, 9)) :: IO (Array P Ix1 Int)
Array P Par (Sz1 10)
  [ 3, 6, 1, 2, 1, 7, 6, 0, 8, 8 ]

Now putting mwc-random to the side we can reuse the same concept for other possible uses cases by using functions like generateArrayWS:

generateArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> Sz ix --  ^ size of new array
  -> (ix -> s -> m e) -- ^ element generating action
  -> m (Array r ix e)

and mapWS:

mapWS ::
     (Source r' ix a, Mutable r ix b, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> (a -> s -> m b) -- ^ Mapping action
  -> Array r' ix a -- ^ Source array
  -> m (Array r ix b)

Here is the promised example on how to use this functionality with rvar, random-fu and mersenne-random-pure64 libraries. We could have used randomArrayWS here as well, but for the sake of example let's say we already have an array with different RVarTs, in which case we need a mapWS:

λ> import Data.Massiv.Array
λ> import Control.Scheduler (WorkerId(..), initWorkerStates)
λ> import Data.IORef
λ> import System.Random.Mersenne.Pure64 as MT
λ> import Data.RVar as RVar
λ> import Data.Random as Fu
λ> rvarArray = makeArrayR D Par (Sz2 3 9) (\ (i :. j) -> Fu.uniformT i j)
λ> mtState <- initWorkerStates Par (newIORef . MT.pureMT . fromIntegral . getWorkerId)
λ> mapWS mtState RVar.runRVarT rvarArray :: IO (Array P Ix2 Int)
Array P Par (Sz (3 :. 9))
  [ [ 0, 1, 2, 2, 2, 4, 5, 0, 3 ]
  , [ 1, 1, 1, 2, 3, 2, 6, 6, 2 ]
  , [ 0, 1, 2, 3, 4, 4, 6, 7, 7 ]
  ]

It is important to note, that despite the fact that pure implementation of Mersenne Twister is being used in the above example, we cannot escape the IO. This is because of the non-deterministic scheduling, which means that we never know which one of the workers will be handling which chunk of the array and consequently which generator will be used for which part of the array. On the up side, if the generator is pure and splittable, such as splitmix, then we can use the pure, deterministic and parallelizable generation function: randomArray, but that is already a separate story.

Solution 2 - Arrays

It's probably not a good idea to do this due to inherently sequential nature of PRNGs. Instead, you might want to transition your code as follows:

  1. Declare an IO function (main, or what have you).
  2. Read as many random numbers as you need.
  3. Pass the (now pure) numbers onto your repa functions.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionbgamariView Question on Stackoverflow
Solution 1 - ArrayslehinsView Answer on Stackoverflow
Solution 2 - ArraysmcandreView Answer on Stackoverflow