In my previous post I talked a little about testing the Hadoop Streaming F# MapReduce code; but it is worth saying a few words about the tester application.

The complete code for this blog post and the F# MapReduce code can be found at:

http://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850

As mentioned Unit Testing the individual map and Reduce functions is relatively straight forward. However performing testing on sample input data is a little trickier. As such I put together a tester application that performs the following functions:

  • Defines and executes a Process for the Mapper executable in which
    • The StdIn is modified to be the file specified in the “–input” command line argument
    • the StdOut is modified to be a file with a “mapper” extension
  • When the Mapper has completed, Sorts the output file from the Mapper into a file with a “reducer” extension
  • When the Sort is complete, defines and executes a Process for the Reducer executable in which
    • The StdIn is modified to be the sorted “reducer” file
    • The StdOut is modified to be the file specified in the “–output” command line argument

Running the Tester application allows one to check inputs and outputs, in a flow similar to running within Hadoop. The code listing for the tester application is:

namespace FSharp.HadoopTester

open System
open System.IO
open System.Collections.Generic
open System.Diagnostics
open System.Threading
open System.Threading.Tasks

open FSharp.Hadoop.Utilities
open FSharp.Hadoop.Utilities.Arguments

module MapReduceConsole =
        
    let Run args =

        // Define what arguments are expected
        let defs = [            
            {ArgInfo.Command="input"; Description="Input File"; Required=true };
            {ArgInfo.Command="output"; Description="Output File"; Required=true };
            {ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
            {ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
            {ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true }; ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs
        Arguments.DisplayArgs parsedArgs

        // define the executables
        let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
        let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])

        Console.WriteLine()
        Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
        Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)

        // Get the file names
        let inputfile = Path.GetFullPath(parsedArgs.["input"])
        let outputfile = Path.GetFullPath(parsedArgs.["output"])

        let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
        let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))

        let mappedfile = Path.ChangeExtension(tempFile, "mapped")
        let reducefile = Path.ChangeExtension(tempFile, "reduced")

        Console.WriteLine()
        Console.WriteLine (sprintf "The input file is:\t\t%O" inputfile)
        Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
        Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
        Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)

        // Give the user an option to continue
        Console.WriteLine()
        Console.WriteLine("Hit ENTER to continue...")
        Console.ReadLine() |> ignore

        // Call the mapper with the input file
        let mapperProcess() =
            use mapper = new Process()
            mapper.StartInfo.FileName <- mapperExe
            mapper.StartInfo.UseShellExecute <- false
            mapper.StartInfo.RedirectStandardInput <- true
            mapper.StartInfo.RedirectStandardOutput <- true
            mapper.Start() |> ignore

            use mapperInput = mapper.StandardInput
            use mapperOutput = mapper.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            Console.WriteLine "Mapper Processing Starting..."   

            let taskMapperFunc() =
                use mapperWriter = File.CreateText(mappedfile)
                while not mapperOutput.EndOfStream do
                    mapperWriter.WriteLine(mapperOutput.ReadLine())
            let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

            // Pass the file into the mapper process and close input stream when done
            use mapperReader = new StreamReader(File.OpenRead(inputfile))
            while not mapperReader.EndOfStream do
                mapperInput.WriteLine(mapperReader.ReadLine())

            mapperInput.Close()
            taskMapperWriting.Wait()
            mapperOutput.Close()

            mapper.WaitForExit()
            let result = match mapper.ExitCode with | 0 -> true | _ -> false

            mapper.Close()
            result
      

        // Sort the mapped file by the first field - mimic the role of Hadoop
        let hadoopProcess() =
            Console.WriteLine "Hadoop Processing Starting..."

            let unsortedValues = seq {
                use reader = new StreamReader(File.OpenRead(mappedfile))
                while not reader.EndOfStream do
                    let input = reader.ReadLine()
                    let keyValue = input.Split('\t')
                    yield (keyValue.[0].Trim(), keyValue.[1].Trim())
                reader.Close()
                }

            use writer = File.CreateText(reducefile)
            unsortedValues
            |> Seq.sortBy fst
            |> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
            writer.Close()

        
        // Finally call the reducer process
        let reducerProcess() =
            use reducer = new Process()
            reducer.StartInfo.FileName <- reducerExe
            reducer.StartInfo.UseShellExecute <- false
            reducer.StartInfo.RedirectStandardInput <- true
            reducer.StartInfo.RedirectStandardOutput <- true
            reducer.Start() |> ignore

            use reducerInput = reducer.StandardInput
            use reducerOutput = reducer.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            Console.WriteLine "Reducer Processing Starting..."

            let taskReducerFunc() =
                use reducerWriter = File.CreateText(outputfile)
                while not reducerOutput.EndOfStream do
                    reducerWriter.WriteLine(reducerOutput.ReadLine())
            let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))

            // Pass the file into the mapper process and close input stream when done
            use reducerReader = new StreamReader(File.OpenRead(reducefile))
            while not reducerReader.EndOfStream do
                reducerInput.WriteLine(reducerReader.ReadLine())

            reducerInput.Close()
            taskReducerWriting.Wait()
            reducerOutput.Close()

            reducer.WaitForExit()
            let result = match reducer.ExitCode with | 0 -> true | _ -> false

            reducer.Close()
            result


        // Finish test
        if mapperProcess() then
            Console.WriteLine "Mapper Processing Complete..."  

            hadoopProcess()
            Console.WriteLine "Hadoop Processing Complete..."

            if reducerProcess() then
                Console.WriteLine "Reducer Processing Complete..."

                Console.WriteLine "Processing Complete..."     
                   
        Console.ReadLine() |> ignore

The input options for the console tester application are:

  • input – Used to specify the input file for processing
  • output – The file to be used to save the results
  • tempPath – A directory used to save intermediate results and for sorting
  • mapper – The mapper executable to run
  • reducer – The reducer executable to run

To run the mapper and reducer A Process is defined. The ProcessStartInfo is defined such that both the Standard Input and Outputs are redirected.

To input data into the mapper one just has to open the file and pass into the mapper StandardInput.

use mapperInput = mapper.StandardInput

use mapperReader = new StreamReader(File.OpenRead(inputfile))
while not mapperReader.EndOfStream do
    mapperInput.WriteLine(mapperReader.ReadLine())

The important thing to remember is that one needs to process the StandardOuput from the mapper on a separate thread. This is achieved using a Task:

use mapperOutput = mapper.StandardOutput  

let taskMapperFunc() =
    use mapperWriter = File.CreateText(mappedfile)
    while not mapperOutput.EndOfStream do
        mapperWriter.WriteLine(mapperOutput.ReadLine())
let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

Before waiting for the mapper executable to exit one will need to Close() the mapper input stream, ensure that the task processing the StandardOuput is completed, and finally Close() the mapper output stream.

mapperInput.Close()
taskMapperWriting.Wait()
mapperOutput.Close()

mapper.WaitForExit()
let result = match mapper.ExitCode with | 0 -> true | _ -> false

mapper.Close()

In the input arguments a temp directory is specified. It is this directory that is used to save the output of the mapper. It is this file that is then sorted using the key value:

let unsortedValues = seq {
    use reader = new StreamReader(File.OpenRead(mappedfile))
    while not reader.EndOfStream do
        let input = reader.ReadLine()
        let keyValue = input.Split('\t')
        yield (keyValue.[0].Trim(), keyValue.[1].Trim())
    reader.Close()
    }

use writer = File.CreateText(reducefile)
unsortedValues
|> Seq.sortBy fst
|> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
writer.Close()

Finally the output from the sort process is passed into the reducer. The process for calling the reducer executable is very similar to that of calling the mapper executable; including ensureing that StandardOuput is processed on a separate processing thread.

In writing the tester application the key factors in getting the processing working are to remember to process the StandardOutput on a separate processing thread and ensure the streams are closed in the correct order so one can determine the outcome of the mapper and reducer executable calls. Other than this the a tester for any MapReduce should be easy to write.