In doing some recent work with Hadoop I needed to process a sequence which was grouped by a projected key. Whereas the Seq.groupBy can perform this operation, the Seq.groupBy function makes no assumption on the ordering of the original sequence. As a consequence the resulting sequence is not lazily evaluated, and is thus not suitable for the possible large sequences one needs to process when running Hadoop MapReduce jobs. Thus I had to create a lazily evaluated sequence based on a projected key.

To achieve this the processing has to be state dependant to handle the transition from one key value to the next. The input data is processed in such a fashion that any change in key value causes a transition to the next sequence. The state persistence is needed to track the key change, and to ensure the first yield of the new sequence is the value that caused this transition; transition values are not lost.

In writing the new Seq grouping extensions I wanted to provide the following prototypes:

groupByOrdered : ('T -> 'Key) –> seq<'T> –> seq<'Key * seq<'T>>
groupByOrderedWith : ('T –> ('Key * ‘Value)) –> seq<'T> –> seq<'Key * seq<'Value>>

The groupByOrdered function provides the same prototype as the Seq.groupBy function. The groupByOrderedWith function provides a means for modifying the type of the returned sequence values. This extension was useful in the Hadoop MapReduce processing.

A full implementation of the code is as follows:

module Seq =

    // Returns the type seq<'Key * seq<'Value>>
    let groupByOrderedWith (projection:('T -> ('Key * 'Value))) (source:seq<'T>) =

        let lastInput = ref None
        let continueDo = ref false

        let comparer = ComparisonIdentity.Structural<'Key>
        let enumerator = source.GetEnumerator()

        let getInput() =
            let found = enumerator.MoveNext()
            if found then
                let value = enumerator.Current
                Some(projection value)
            else
                None

        let inputsByKey (key:'Key) (firstValue:'Value) = seq {
            // Yield any value from previous read
            yield firstValue

            continueDo := true
            while !continueDo do
                match getInput() with
                | Some(input) when comparer.Compare(fst input, key) = 0 ->
                    // Yield found value and remainder of sequence
                    yield (snd input)
                | Some(input) ->
                    // Have a value but different key
                    lastInput := Some(input)
                    continueDo := false
                | None ->
                    // Have no more entries
                    lastInput := None
                    continueDo := false
        }

        let rec processInput (input:('Key * 'Value) option) = seq {
            if input.IsSome then
                let key = fst input.Value
                let value = snd input.Value
                yield (key, inputsByKey key value)
                yield! processInput (lastInput.contents)
        }

        processInput (getInput())        


    // Returns the type seq<'Key * seq<'T>>
    let groupByOrdered (projection:('T -> 'Key)) (source:seq<'T>) =
        groupByOrderedWith (fun value -> (projection value, value)) source

So why write this code? If one does have to perform a Seq.groupBy of a large ordered sequence one can now group a tuple, ready for processing, with the following code:

let groupedSequence = Seq.groupByOrderedWith (fun value -> (fst value, snd value)) mySequence

Hopefully you will agree this code is simple to use. However a word of warning is warranted. These extensions should only be utilized if one knows the input data is sorted.

A final word on the use of the while statement. Whereas the inner sequence could be generated with a recursive function there is an issue with me using the match..when statement. The use of this statement means the function would not be guaranteed to be tail recursive, hence the rational in using a while loop instead.

Enjoy and Happy New Year.