I thought I would take a break for a while from Hadoop and put together an F# .Net implementation of a Priority Queue; implemented using a heap data structure. Conceptually we can think of a heap as a balanced binary tree. The tree will have a root, and each node can have up to two children; a left and a right child. The keys in such a binary tree are said to be in heap order, such that any element is at least as large as its parent element.

For every element v, at a node i, the element w at i’s parent satisfies key(w)≤key(v)

The heap is maintained as an array, indexed by i=1…N, such that for any node i, the nodes at positions leftChild(i)=2i and rightChild(i)=2i+1, and the parent of the node is at position parent(i)=(1/2).

The idea behind a Priority Queue is that finding the element with the lowest key is easy, O(1) time, as it is always the root. When adding a new element it is always added to the end of the list. The heap is then fixed by recursively checking the new element with its parent, and swapping their positions in the list until the heap condition is satisfied. This operation takes O(log n) time.

Within .Net we implemented a Priority Queue using a List<KeyValuePair<'TKey, 'TValue>>.

The code, as always can be downloaded from:

http://code.msdn.microsoft.com/Net-Implementation-of-a-d3ac7b9d

The basic operations supported on the Priority Queue will be:

Enqueue Adds an item to the Queue
Dequeue Removes the root element; that with the lowest key
Peek Queries the root element
IndexOf Returns the index of a given key value pair
RemoveAt Removes the element at the specified index value
ContainsKey Determines whether the Queue contains a specific key value
GetValues Gets the values for a specified key
ChangeKey Changes a specific key value
RemoveKey Removes the element with the specific key value
Item An indexer based on the index values
Merge Merges an Enumerable with the Queue

Before showing the code a quick word is warranted about the implementation. The Key operations shown above rely on a second data structure; a Dictionary<'TKey, int>. This secondary data structure maintains a reference of the index within the list for each key. This positional structure uses a list off index values associated with each key, allowing items with the same key value to be added to the queue.

So what does the F# code look like:

  1. type PriorityQueue<'TKey, 'TValue when 'TKey : comparison> private (data:IEnumerable<KeyValuePair<'TKey, 'TValue>> option, capacity:int, comparer:IComparer<'TKey>) =   
  2.  
  3.     let mutable heapList:List<KeyValuePair<'TKey, 'TValue>> =  null
  4.     let mutable positionDict:Dictionary<'TKey, ResizeArray<int>> = null
  5.  
  6.     // Determines if a value is in the heap
  7.     let inRange index =
  8.         if index >= 0 && index < heapList.Count then Some(index) else None
  9.     let checkIndex index =
  10.         if ((inRange index).IsNone) then raise (ArgumentException(sprintf "Index specified is not within range - %i" index))
  11.  
  12.     // Gets the children of a node
  13.     let getChildren (index:int) =
  14.         // children left[2*pos] and right[2*pos + 1] where pos = index + 1
  15.         let left = (2 * index) + 1
  16.         let right = (2 * index) + 2     
  17.         (inRange left, inRange right)
  18.  
  19.     // Gets the parent of an index
  20.     let getParent (index:int) =
  21.         // parent index [pos/2] where index = pos - 1
  22.         if (index = 0) then None
  23.         else Some((index-1) / 2)
  24.  
  25.     // Tests to see if the first value is greater than the first
  26.     let isGreater parent child =
  27.         if (comparer.Compare(heapList.[parent].Key, heapList.[child].Key) > 0) then true
  28.         else false
  29.             
  30.     // Swaps two elements of the heap list
  31.     let swapElements idx1 idx2 =
  32.         let element1 = heapList.[idx1]
  33.         let element2 = heapList.[idx2]
  34.         heapList.[idx1] <- heapList.[idx2]
  35.         heapList.[idx2] <- element1
  36.         //positionDict.[element1.Key] <- idx2
  37.         positionDict.[element1.Key].Remove(idx1) |> ignore
  38.         positionDict.[element1.Key].Add(idx2)
  39.         //positionDict.[element2.Key] <- idx1
  40.         positionDict.[element2.Key].Remove(idx2) |> ignore
  41.         positionDict.[element2.Key].Add(idx1)
  42.  
  43.     // Heapifys toward the parent
  44.     let rec heapifyUp (index:int) =
  45.         if (index > 0) then
  46.             let parent = getParent index
  47.             if (isGreater parent.Value index) then
  48.                 swapElements parent.Value index
  49.                 heapifyUp parent.Value
  50.  
  51.     // Heapifys down to the children
  52.     let rec heapifyDown (index:int) =
  53.         let (left, right) = getChildren index
  54.         if (left.IsSome) then
  55.             let childindex =
  56.                 if (right.IsSome && (isGreater left.Value right.Value)) then right.Value
  57.                 else left.Value
  58.             if (isGreater index childindex) then
  59.                 swapElements index childindex
  60.                 heapifyDown childindex
  61.  
  62.     // Heapifys down to the children
  63.     let heapifyUpDown (index:int) =
  64.         let parent =  getParent index
  65.         if (parent.IsSome && (isGreater parent.Value index)) then
  66.             heapifyUp index
  67.         else
  68.             heapifyDown index
  69.  
  70.     // Adds an items and heapifys
  71.     let insertItem (key:'TKey) (value:'TValue) =
  72.         let insertindex = heapList.Count
  73.         //positionDict.Add(key, insertindex)
  74.         if not (positionDict.ContainsKey(key)) then
  75.             positionDict.Add(key, new ResizeArray<int>())
  76.         positionDict.[key].Add(insertindex)
  77.         heapList.Add(new KeyValuePair<'TKey, 'TValue>(key, value))
  78.         heapifyUp(insertindex)
  79.  
  80.     // Delete the root node and heapifys
  81.     let deleteItem index =
  82.         if (heapList.Count <= 1) then
  83.             heapList.Clear()
  84.             positionDict.Clear()
  85.         else
  86.             let lastindex = heapList.Count - 1
  87.             let indexKey =  heapList.[index].Key
  88.             let lastKey = heapList.[lastindex].Key
  89.             heapList.[index] <- heapList.[lastindex]
  90.             //positionDict.[lastKey] <- index
  91.             positionDict.[lastKey].Remove(lastindex) |> ignore
  92.             positionDict.[lastKey].Add(index)
  93.             heapList.RemoveAt(lastindex)
  94.             //positionDict.Remove(indexKey)
  95.             positionDict.[indexKey].Remove(index) |> ignore
  96.             if (positionDict.[indexKey].Count = 0) then
  97.                 positionDict.Remove(indexKey) |> ignore
  98.             heapifyDown index
  99.  
  100.     let powerTwo (value:int) =
  101.         int (2.0 ** (float value))
  102.  
  103.     // Merge new queue with original
  104.     let mergeNewList (data:IEnumerable<KeyValuePair<'TKey, 'TValue>>) =
  105.         // Add list to the end of the queue
  106.         let baseIdx = heapList.Count
  107.         data |> Seq.iteri (fun idx item ->
  108.             //positionDict.Add(item.Key, baseIdx + idx)
  109.             if not (positionDict.ContainsKey(item.Key)) then
  110.                 positionDict.Add(item.Key, new ResizeArray<int>())
  111.             positionDict.[item.Key].Add(baseIdx + idx)
  112.             heapList.Add(new KeyValuePair<'TKey, 'TValue>(item.Key, item.Value)))
  113.         // Now heapify the Queue from the bottom up
  114.         let depth = int (Math.Log(float heapList.Count, 2.0))
  115.         for depthI = depth downto 1 do
  116.             for index = ((powerTwo (depthI-1))-1) to ((powerTwo depthI)-2) do
  117.                 let (left, right) = getChildren index
  118.                 if (left.IsSome) then
  119.                     let childindex =
  120.                         if (right.IsSome && (isGreater left.Value right.Value)) then right.Value
  121.                         else left.Value
  122.                     if (isGreater index childindex) then
  123.                         swapElements index childindex
  124.  
  125.     // Default do bindings
  126.     do
  127.         if (comparer = null) then
  128.             raise (ArgumentException("Comparer cannot be null"))
  129.  
  130.         let equalityComparer =
  131.             { new IEqualityComparer<'TKey> with
  132.                 override x.Equals(c1, c2) =
  133.                     if ((comparer.Compare(c1, c2)) = 0) then true
  134.                     else false
  135.                 override x.GetHashCode(value) =
  136.                     value.GetHashCode()
  137.             }
  138.  
  139.         heapList <- new List<KeyValuePair<'TKey, 'TValue>>(capacity)
  140.         positionDict <- new Dictionary<'TKey, ResizeArray<int>>(capacity, equalityComparer)
  141.  
  142.         if data.IsSome then
  143.             //data.Value |> Seq.iter (fun item -> insertItem item.Key item.Value)
  144.             mergeNewList data.Value
  145.  
  146.     // Set of constructors
  147.     new() = PriorityQueue(None, 0, ComparisonIdentity.Structural<'TKey>)
  148.     new(capacity:int) = PriorityQueue(None, capacity, ComparisonIdentity.Structural<'TKey>)
  149.     new(data:IEnumerable<KeyValuePair<'TKey, 'TValue>>) = PriorityQueue(Some(data), 0, ComparisonIdentity.Structural<'TKey>)
  150.     new(comparer:IComparer<'TKey>) = PriorityQueue(None, 0, comparer)
  151.     new(capacity:int, comparer:IComparer<'TKey>) = PriorityQueue(None, capacity, comparer)
  152.     new(data:IEnumerable<KeyValuePair<'TKey, 'TValue>>, comparer:IComparer<'TKey>) = PriorityQueue(Some(data), 0, comparer)
  153.  
  154.     // Checks to see if the heap is empty
  155.     member this.IsEmpty
  156.         with get() = (heapList.Count = 0)
  157.  
  158.     // Enqueues a new entry into the heap
  159.     member this.Enqueue (key:'TKey) (value:'TValue) =
  160.         insertItem key value
  161.         ()
  162.  
  163.     // Peeks at the head of the heap
  164.     member this.Peek() =
  165.         if not(this.IsEmpty) then
  166.             heapList.[0]
  167.         else
  168.             raise (InvalidOperationException("Priority Queue is empty"))
  169.  
  170.     // Dequeues the head entry into the heap
  171.     member this.Dequeue() =
  172.         let value = this.Peek()
  173.         deleteItem 0
  174.         value
  175.  
  176.     // Determines whether an item is in the queue
  177.     member this.Contains (key:'TKey) (value:'TValue) =
  178.         heapList.Contains(new KeyValuePair<'TKey, 'TValue>(key, value))
  179.  
  180.     // Returns the index of a specified item
  181.     member this.IndexOf (key:'TKey) (value:'TValue) =
  182.         heapList.IndexOf(new KeyValuePair<'TKey, 'TValue>(key, value))
  183.  
  184.     // Removes an item from the queue at the specified index
  185.     member this.RemoveAt (index:int) =
  186.         checkIndex index
  187.         deleteItem index
  188.  
  189.     // Determines whether an item is in the queue
  190.     member this.ContainsKey (key:'TKey) =
  191.         positionDict.ContainsKey key
  192.  
  193.     // Gets the values associated with a Key
  194.     member this.GetValues (key:'TKey) =
  195.         if (positionDict.ContainsKey key) then
  196.             positionDict.[key].ToArray()
  197.             |> Array.map (fun index -> heapList.[index].Value)
  198.         else
  199.             Array.empty
  200.  
  201.     // Changes the key of a value in the queue
  202.     member this.ChangeKey (key:'TKey) (value:'TKey)=
  203.         let indexArray = positionDict.[key]
  204.         indexArray.ToArray()
  205.         |> Array.iter (fun index ->
  206.             let item = heapList.[index]
  207.             heapList.[index] <- new KeyValuePair<'TKey, 'TValue>(value, item.Value))
  208.         positionDict.Remove(key) |> ignore
  209.         positionDict.Add(value, indexArray)
  210.         indexArray.ToArray()
  211.         |> Array.iter (fun index ->
  212.             heapifyUpDown index)
  213.  
  214.     // Removes an item from the queue for the specified key
  215.     member this.RemoveKey (key:'TKey) =
  216.         let indexArray = positionDict.[key]
  217.         indexArray.ToArray()
  218.         |> Array.iter (fun index ->
  219.             this.RemoveAt index)
  220.  
  221.     // Modifies elements based on index values
  222.     member this.Item
  223.         with get(index) =
  224.             checkIndex index
  225.             heapList.[index]
  226.         and set(index) (value) =
  227.             checkIndex index
  228.             heapList.[index] <- new KeyValuePair<'TKey, 'TValue>(heapList.[index].Key, value)
  229.             heapifyUpDown index
  230.  
  231.     // Returns the count of the queue
  232.     member this.Count
  233.         with get() = heapList.Count
  234.  
  235.     // Resets the capacity of the Queue
  236.     member this.TrimExcess() =
  237.         heapList.TrimExcess()
  238.  
  239.     // Returns the capacity of the queue
  240.     member this.Capacity
  241.         with get() = heapList.Capacity
  242.  
  243.     // Clears the queue
  244.     member this.Clear() =
  245.         heapList.Clear()
  246.  
  247.     // Merges an existing IEnumerable
  248.     member this.Merge(data:IEnumerable<KeyValuePair<'TKey, 'TValue>>) =
  249.         mergeNewList data
  250.  
  251.     // Standard IList members
  252.     interface ICollection<KeyValuePair<'TKey, 'TValue>> with
  253.  
  254.         member this.Add(item:KeyValuePair<'TKey, 'TValue>) =
  255.             this.Enqueue item.Key item.Value
  256.  
  257.         member this.Clear() =
  258.             heapList.Clear()
  259.  
  260.         member this.Contains(item:KeyValuePair<'TKey, 'TValue>) =
  261.             heapList.Contains(item)
  262.  
  263.         member this.Count
  264.             with get() = heapList.Count
  265.  
  266.         member this.CopyTo(toArray:KeyValuePair<'TKey, 'TValue>[], arrayIndex:int) =
  267.             heapList.CopyTo(toArray, arrayIndex)
  268.  
  269.         member this.IsReadOnly
  270.             with get() = false
  271.  
  272.         member this.Remove(item:KeyValuePair<'TKey, 'TValue>) =
  273.             let index = heapList.IndexOf(item)
  274.             if (inRange index).IsSome then
  275.                 deleteItem index
  276.                 true
  277.             else
  278.                 false
  279.  
  280.         member this.GetEnumerator() =
  281.             upcast heapList.GetEnumerator()
  282.  
  283.     // IEnumerable GetEnumerator implementation
  284.     interface IEnumerable with
  285.         member this.GetEnumerator() =  
  286.             upcast heapList.GetEnumerator()

This code implementation has the PriorityQueue derive from ICollection. The implementation is analogous to the .Net Queue implementation, but with Key and Value pairs; but more importantly where Dequeue is guaranteed to return the element with the lowest value.

The correctness of the heap is managed through the heapify operations. It is these recursive operations that compare parent and child nodes and adjust the positions accordingly.

One has to remember though that the enumerator will effectively return elements in a random order.