In doing some recent work with Hadoop I needed to process a sequence which was grouped by a projected key. Whereas the Seq.groupBy can perform this operation, the Seq.groupBy function makes no assumption on the ordering of the original sequence. As a consequence the resulting sequence is not lazily evaluated, and is thus not suitable for the possible large sequences one needs to process when running Hadoop MapReduce jobs. Thus I had to create a lazily evaluated sequence based on a projected key.

To achieve this the processing has to be state dependant to handle the transition from one key value to the next. The input data is processed in such a fashion that any change in key value causes a transition to the next sequence. The state persistence is needed to track the key change, and to ensure the first yield of the new sequence is the value that caused this transition; transition values are not lost.

In writing the new Seq grouping extensions I wanted to provide the following prototypes:

groupByOrdered : ('T -> 'Key) –> seq<'T> –> seq<'Key * seq<'T>>

groupByOrderedWith : ('T –> ('Key * ‘Value)) –> seq<'T> –> seq<'Key * seq<'Value>>

The groupByOrdered function provides the same prototype as the Seq.groupBy function. The groupByOrderedWith function provides a means for modifying the type of the returned sequence values. This extension was useful in the Hadoop MapReduce processing.

A full implementation of the code is as follows:

// Returns the type seq<'Key * seq<'Value>>

let groupByOrderedWith (projection:('T -> ('Key * 'Value))) (source:seq<'T>) =

let lastInput = ref None

let continueDo = ref false

let comparer = ComparisonIdentity.Structural<'Key>

let enumerator = source.GetEnumerator()

let getInput() =

let found = enumerator.MoveNext()

if found then

let value = enumerator.Current

Some(projection value)

else

None

let inputsByKey (key:'Key) (firstValue:'Value) = seq {

// Yield any value from previous read

yield firstValue

continueDo := true

while !continueDo do

match getInput() with

| Some(input) when comparer.Compare(fst input, key) = 0 ->

// Yield found value and remainder of sequence

yield (snd input)

| Some(input) ->

// Have a value but different key

lastInput := Some(input)

continueDo := false

| None ->

// Have no more entries

lastInput := None

continueDo := false

}

let rec processInput (input:('Key * 'Value) option) = seq {

if input.IsSome then

let key = fst input.Value

let value = snd input.Value

yield (key, inputsByKey key value)

yield! processInput (lastInput.contents)

}

processInput (getInput())

// Returns the type seq<'Key * seq<'T>>

let groupByOrdered (projection:('T -> 'Key)) (source:seq<'T>) =

groupByOrderedWith (fun value -> (projection value, value)) source

So why write this code? If one does have to perform a Seq.groupBy of a large ordered sequence one can now group a tuple, ready for processing, with the following code:

let groupedSequence = Seq.groupByOrderedWith (fun value -> (fst value, snd value)) mySequence

Hopefully you will agree this code is simple to use. However a word of warning is warranted. These extensions should only be utilized if one knows the input data is sorted.

A final word on the use of the while statement. Whereas the inner sequence could be generated with a recursive function there is an issue with me using the match..when statement. The use of this statement means the function would not be guaranteed to be tail recursive, hence the rational in using a while loop instead.

Enjoy and Happy New Year.