If you follow the excellent Parallel Programing with .Net blog, you will have read a recent post by Emad Omara demonstrating a Parallel Merge Sort using Barrier. While there may be more efficient parallel sorting options, as this post notes, this is a good demonstration of the usage of a Barrier, and presents a reasonable parallel sorting solution. As such I thought it would be useful to present this code in F#.

Before going into the code in detail here are some comparisons of the F# parallel sorting performance on my quad core laptop, for an array of 5 million floats.

 Operation Sort Time (seconds) Array.Parallel.sortInPlace 0.702001 Array.sortInPlace 1.778403 Array.Parallel.sort 0.780001 Array.sort 1.794003 Array.Parallel.sortInPlaceWith 1.903203 Array.sortInPlaceWith 5.538010 Array.Parallel.sortInPlaceBy 0.795601 Array.sortInPlaceBy 1.794003

As you can see the parallel sort performance is as stated quite reasonable; at least doubling the base sort performance. The code to be demonstrated will provide an implementation for all six operations; sort, sortInPlace, sortBy, sortInPlaceBy, sortWith, and, sortInPlaceWith.

A quick point about the original implementation. In the original code it is the original array that is sorted. F# sort operations support sorts that return a new array and sort that operate InPlace. As such the InPlace performance will be slightly faster. The reason for this is that for the non-InPlace version the original array is copied into a secondary array, which is sorted InPlace.

So without further adieu, what does the code look like? Here is a complete listing:

1. namespace MSDN.FSharp.Parallel
2.
3. open System
6.
7. type private MergeArrayType =
8.     | FromArray
9.     | ToArray
10.
11. type ParallelSort() =
12.
13.     static member public Sort(array: 'T []) =
14.         let arraySort = Array.copy array
15.         ParallelSort.SortInPlaceInternal(arraySort)
16.         arraySort
17.
18.     static member public SortBy(array: 'T [], projection: 'T -> 'Key) =
19.         let arraySort = Array.copy array
20.         ParallelSort.SortInPlaceInternal(array, projection = projection)
21.         arraySort
22.
23.     static member public SortWith(array: 'T [], comparer: 'T -> 'T -> int) =
24.         let arraySort = Array.copy array
25.         ParallelSort.SortInPlaceInternal(array, comparer = comparer)
26.         arraySort
27.
28.     static member public SortInPlace(array: 'T []) =
29.         ParallelSort.SortInPlaceInternal(array)
30.
31.     static member public SortInPlaceBy(array: 'T [], projection: 'T -> 'Key) =
32.         ParallelSort.SortInPlaceInternal(array, projection = projection)
33.
34.     static member public SortInPlaceWith(array: 'T [], comparer: 'T -> 'T -> int) =
35.         ParallelSort.SortInPlaceInternal(array, comparer = comparer)
36.
37.     // Private function that is used to control the sorting
38.     static member private SortInPlaceInternal(array: 'T [], ?comparer: 'T -> 'T -> int, ?projection: 'T -> 'Key) =
39.
40.         // used to do the merge and sort comparisions
41.         let sortComparer =
42.             match comparer with
43.             | Some c -> ComparisonIdentity.FromFunction c
44.             | _ -> ComparisonIdentity.Structural<'T>
45.
46.         let projectionComparer = ComparisonIdentity.Structural<'Key>
47.
48.         let inline sortComparerResult (item1: 'T) (item2: 'T) =
49.             match projection with
50.             | Some p -> projectionComparer.Compare(p item1, p item2)
51.             | None -> sortComparer.Compare(item1, item2)
52.
53.         // The merge of the two array
54.         let merge (toArray: 'T []) (fromArray: 'T []) (low1: int) (low2: int) (high1: int) (high2: int) =
55.             let mutable ptr1 = low1
56.             let mutable ptr2 = high1
57.
58.             for ptr in low1..high2 do
59.                 if (ptr1 > low2) then
60.                     toArray.[ptr] <- fromArray.[ptr2]
61.                     ptr2 <- ptr2 + 1
62.                 elif (ptr2 > high2) then
63.                     toArray.[ptr] <- fromArray.[ptr1]
64.                     ptr1 <- ptr1 + 1
65.                 elif ((sortComparerResult fromArray.[ptr1] fromArray.[ptr2]) <= 0) then
66.                     toArray.[ptr] <- fromArray.[ptr1]
67.                     ptr1 <- ptr1 + 1
68.                 else
69.                     toArray.[ptr] <- fromArray.[ptr2]
70.                     ptr2 <- ptr2 + 1
71.
72.         // define the sort operation
73.         let parallelSort (array: 'T []) =
74.
75.             // control flow parameters
76.             let totalWorkers = int (2.0 ** float (int (Math.Log(float Environment.ProcessorCount, 2.0))))
77.             let auxArray : 'T array = Array.zeroCreate array.Length
78.             let workers : Task array = Array.zeroCreate (totalWorkers - 1)
79.             let iterations = int (Math.Log((float totalWorkers), 2.0))
80.
81.             // define a key array if needed for sorting on a projection
82.             let keysArray =
83.                 match projection with
84.                 | Some p -> Array.init array.Length (fun idx -> p array.[idx])
85.                 | None -> [||]
86.
87.             // Number of elements for each array, if the elements number is not divisible by the workers
88.             // the remainders will be added to the first worker (the main thread)
89.             let partitionSize = ref (int (array.Length / totalWorkers))
90.             let remainder = array.Length % totalWorkers
91.
92.             // Define the arrays references for processing as they are swapped during each iteration
93.             let swapped = ref false
94.
95.             let inline getMergeArray (arrayType: MergeArrayType) =
96.                 match (arrayType, !swapped) with
97.                 | (FromArray, true) -> auxArray
98.                 | (FromArray, false) -> array
99.                 | (ToArray, true) -> array
100.                 | (ToArray, false) -> auxArray
101.
102.             use barrier = new Barrier(totalWorkers, fun (b) ->
103.                 partitionSize := !partitionSize <<< 1
104.                 swapped := not !swapped)
105.
106.             // action to perform the sort an merge steps
107.             let action (index: int) =
108.
109.                 //calculate the partition boundary
110.                 let low = index * !partitionSize + match index with | 0 -> 0 | _ -> remainder
111.                 let high = (index + 1) * !partitionSize - 1 + remainder
112.
113.                 // Sort the specified range - could implement QuickSort here
114.                 let sortLen = high - low + 1
115.                 match (comparer, projection) with
116.                 | (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
117.                 | (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
118.                 | (_, _) -> Array.Sort(array, low, sortLen)
119.
120.                 barrier.SignalAndWait()
121.
122.                 let rec loopArray loopIdx actionIdx loopHigh =
123.                     if loopIdx < iterations then
124.                         if (actionIdx % 2 = 1) then
125.                             barrier.RemoveParticipant()
126.                         else
127.                             let newHigh = loopHigh + !partitionSize / 2
128.                             merge (getMergeArray FromArray) (getMergeArray ToArray) low loopHigh (loopHigh + 1) newHigh
129.                             barrier.SignalAndWait()
130.                             loopArray (loopIdx + 1) (actionIdx >>> 1) newHigh
131.                 loopArray 0 index high
132.
133.             for index in 1 .. workers.Length do
134.                 workers.[index - 1] <- Task.Factory.StartNew(fun() -> action index)
135.
136.             action 0
137.
138.             // if odd iterations return auxArray otherwise array (swapped will be false)
139.             if not (iterations % 2 = 0) then
140.                 Array.blit auxArray 0 array 0 array.Length
141.
142.         // Perform the sorting
143.         match array with
144.         | [||] -> failwith "Empty Array"
145.         | small when small.Length < (Environment.ProcessorCount * 2) ->
146.             match (comparer, projection) with
147.             | (Some c, _) -> Array.sortInPlaceWith c array
148.             | (_, Some p) -> Array.sortInPlaceBy p array
149.             | (_, _) -> Array.sortInPlace array
150.         | _ -> parallelSort array

As you can see, the internal method SortInPlaceInternal is the actual implementation of the sort. The remaining members deal with settings parameters for calling this function, based on the sort options.

Whereas the nature of this sort is identical to the original implementation, there are some subtle differences. These mostly dealing with comparison, optional projection for the sortBy, and array references.

Firstly it is worth talking about the implementation of the Barrier and array references. In this Barrier implementation the partition size is decreased (as before) and a flag is set indicating that a swap has occurred. So what does this mean? The swapped flag is used to determine the direction of the merge operation after the initial parallelized sorts. When a merge is performed the From and To arrays are determined function in which the returned array is determined by this swapped flag:

let inline getMergeArray (arrayType: MergeArrayType) =
match (arrayType, !swapped) with
| (FromArray, true) -> auxArray
| (FromArray, false) -> array
| (ToArray, true) -> array
| (ToArray, false) -> auxArray

So why do this? The rational behind this was so that the array references between merge passes did not have to change. Instead the merge operation just gathers a reference to the appropriate array.

As mentioned one of the big differences in this implementations is the processing of comparisons and the optional projection for sortBy operations.

During the sort process there are 2 comparisons that take place. The first is that for the initial Array.Sort, and the second is that used for the merge steps. The reason for this distinction is the use of Array.Sort for sorting the sections of the array in Parallel. Array.Sort supports the optional use of an optional IComparer(T) generic interface. Thus when a comparer is specified it has to be converted to this interface. Luckily F# makes this easy:

let sortComparer =
match comparer with
| Some c -> ComparisonIdentity.FromFunction c
| _ -> ComparisonIdentity.Structural<'T>

When dealing with the optional projection for sortBy one could similarly construct an object implementing IComparer using a comparer like:

let pCompare a b = compare (projection a) (projection b)
let pComparer = ComparisonIdentity.FromFunction pCompare

However, for sorting performance, I found a more efficient approach was to define an array containing the projected keys for the array. This allows the usage of the override for Array.Sort that takes a set of keys, in addition to the array.

Thus combing both the comparer and projection requirements for sorting, each parallel range is sorted with the following:

let sortLen = high - low + 1
match (comparer, projection) with
| (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
| (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
| (_, _) -> Array.Sort(array, low, sortLen)

For merging, if possible structural comparisons are used. Thus the comparison of array elements is defined using the following:

let projectionComparer = ComparisonIdentity.Structural<'Key>

let inline sortComparerResult (item1: 'T) (item2: 'T) =
match projection with
| Some p -> projectionComparer.Compare(p item1, p item2)
| None -> sortComparer.Compare(item1, item2)

With the internal sorting process defined all that remains is to actually perform the sort:

match array with
| [||] -> failwith "Empty Array"
| small when small.Length < (Environment.ProcessorCount * 2) ->
match (comparer, projection) with
| (Some c, _) -> Array.sortInPlaceWith c array
| (_, Some p) -> Array.sortInPlaceBy p array
| (_, _) -> Array.sortInPlace array
| _ -> parallelSort array

As in the previous example an optimization is in place to ensure that small arrays are sorted using the base sort operations. One can however configure this to suit as necessary; as one can also configure the total worker threads.

As one of objectives of this exercise was to provide sort operations on the Array.Parallel module, a few extensions are defined:

module Array =
module Parallel =

let sort (array: 'T []) =
ParallelSort.Sort(array)

let sortBy (projection: 'T -> 'Key) (array: 'T []) =
ParallelSort.SortBy(array, projection)

let sortWith (comparer: 'T -> 'T -> int) (array: 'T []) =
ParallelSort.SortWith(array, comparer)

let sortInPlace (array: 'T []) =
ParallelSort.SortInPlace(array)

let sortInPlaceBy (projection: 'T -> 'Key) (array: 'T []) =
ParallelSort.SortInPlaceBy(array, projection)

let sortInPlaceWith (comparer: 'T -> 'T -> int) (array: 'T []) =
ParallelSort.SortInPlaceWith(array, comparer)

These definitions allow one to perform parallel sort operations as one would do for base sorting:

array
|> Array.Parallel.sortInPlace

So if you have the need to perform parallel sort operation hopefully this will get you off the ground. If you want to see the code run, here is an fsx source file definition that I have used for testing.

// This file is a script that can be executed with the F# Interactive.

open System
open MSDN.FSharp.Parallel

// Sort which runs a serial
let items0 = [| 9; 7; 5; 3; 1 |]
items0
|> Array.Parallel.sortInPlace

printfn "Serial: %A" items0

// Sort simple collection of numbers
let items1 = [| 10000 .. -1 .. 1 |]

items1
|> Array.Parallel.sort
|> printfn "Simple New: %A"

items1
|> Array.Parallel.sortInPlace

printfn "Simple In Place: %A" items1

// Base parallel sort test
let items2 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items2
|> Array.Parallel.sortInPlace

printfn "Sorted: %A" items2

// Parallel sort with a projection
let items3 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items3
|> Array.Parallel.sortInPlaceBy (fun item -> abs item)

printfn "Sorted ABS: %A" items3

// Some 5 million item array performance testing

open System
open MSDN.FSharp.Parallel

let rnd = System.Random();

let recordTime func =
GC.Collect(GC.MaxGeneration)
GC.WaitForFullGCComplete() |> ignore
GC.WaitForPendingFinalizers()
let started = DateTime.Now
func()
DateTime.Now - started

let writeTime (message:string) (sortCount:int) (timespan : TimeSpan) =
printfn "%s: Sort took %f seconds : Element count = %i" message timespan.TotalSeconds sortCount

let itemsBase = [| for f in 0 .. 1 .. 5000000 -> (rnd.NextDouble() - 0.8) * 1000.0 |]

// Base sort
let items8 = Array.copy itemsBase
let items9 = Array.copy itemsBase

recordTime (fun () ->
items8
|> Array.Parallel.sortInPlace)
|> writeTime "ParallelInPlace" 5000000

recordTime (fun () ->
items9
|> Array.sortInPlace)
|> writeTime "SequentialInPlace" 5000000

// Base sort new array
let items8n = Array.copy itemsBase
let items9n = Array.copy itemsBase

recordTime (fun () ->
items8n
|> Array.Parallel.sort
|> ignore)
|> writeTime "Parallel" 5000000

recordTime (fun () ->
items9n
|> Array.sort
|> ignore)
|> writeTime "Sequential" 5000000

// With sort
let items8w = Array.copy itemsBase
let items9w = Array.copy itemsBase

recordTime (fun () ->
items8w
|> Array.Parallel.sortInPlaceWith compare)
|> writeTime "ParallelInPlaceWith" 5000000

recordTime (fun () ->
items9w
|> Array.sortInPlaceWith compare)
|> writeTime "SequentialInPlaceWith" 5000000

// By sort
let items8b = Array.copy itemsBase
let items9b = Array.copy itemsBase

recordTime (fun () ->
items8b
|> Array.Parallel.sortInPlaceBy (fun item -> abs item))
|> writeTime "ParallelInPlaceBy" 5000000

recordTime (fun () ->
items9b
|> Array.sortInPlaceBy (fun item -> abs item))
|> writeTime "SequentialInPlaceBy" 5000000

In future posts I will look at QuickSort options in more detail.