So, to round out the Hadoop Streaming samples I thought I would put together an XML Streaming sample. As always the code can be found here:

http://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850

XML Streaming Reader

So how does one stream in XML? If you read the Hadoop Streaming documentation you will notice the following FAQ:

You can use the record reader StreamXmlRecordReader to process XML documents.

hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

Anything found between BEGIN_STRING and END_STRING would be treated as one record for map tasks.

This should allow one to define a start and end tag using commands such as:

-inputreader "StreamXmlRecord,begin=<Row>,end=</Row>"

However, if one tries to run this from the command console one gets an error due to the “<” character; as this is used to redirect standard input. It seems that using the usual caret escape character ^ is not enough.

In this example, to parse the XML one could just specify the XML Element name, “Row” in this case, and the start and end tags could easily be derived. As such to assist in XML processing I have provided an XML reader to do exactly this. I was going to modify the base Hadoop stream reader but it seems that there is traction behind the usage of a Mahout library, called “org.apache.mahout.classifier.bayes.XmlInputFormat”.

I have changed this base library to allow a configuration value of “"xmlinput.element" that allows one to just define the XML Element to be processed. This allows one to support an XML Streaming job used with the following configuration:

"-D xmlinput.element=Store" -inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat

As a note one has to remember to place the job configuration setting in quotes.

The changes to support this are quite minimal. Firstly the Input Format is defined as:

public class XmlElementStreamingInputFormat
    extends FileInputFormat <NullWritable, Text> {
  
    public static final String XML_ELEMENT_KEY = "xmlinput.element";
   
    @Override
    public RecordReader<NullWritable, Text> getRecordReader(
        InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
        
        return new XmlElementStreamingRecordReader((FileSplit) inputSplit, jobConf);
    }
}

Secondly the Record Reader is modified to define the start and end tags based on the new configuration value:

String elementTagName = jobConf.get(XML_ELEMENT_KEY);
            
startTag = ("<" + elementTagName + ">").getBytes("UTF8");
endTag = ("</" + elementTagName + ">").getBytes("UTF8");

In using this code one has the restriction that the element tag being searched for is well-formed and has no corresponding attributes. If this is not the case one can easily create a new XML Input Format class specific to your needs.

The complete original source can be found in various places; including github. My modifications are in the source code download.

With the new XML Input Format class in place we are good to go with our F# code.

Map and Reduce Classes

As always, lets start with Map and Reduce functions. The goal of the sample code is to support a Map and Reduce with the following prototypes:

Map : XElement –> seq<(string * string) * obj>
Reduce : string * string -> seq<string> -> obj option

The idea is the Mapper takes in an XElement and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key and value pair and returns an optional reduced value.

The main difference that you will notice in this sample is that the key is a tuple, which in turn is mapped into multiple map keys. This is supported in Streaming application through the configuration value “stream.num.map.output.key.fields”. The Map in the previous samples returns a single string key value.

For the sample I was going to process a series of XML nodes with the following structure (created by running a TSQL Select from the Adventure Works Store table):

<Store>
  <BusinessEntityID>294</BusinessEntityID>
  <Name>Professional Sales and Service</Name>
  <SalesPersonID>276</SalesPersonID>
  <Demographics>
    <StoreSurvey xmlns="http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey">
      <AnnualSales>800000</AnnualSales>
      <AnnualRevenue>80000</AnnualRevenue>
      <BankName>International Bank</BankName>
      <BusinessType>BM</BusinessType>
      <YearOpened>1991</YearOpened>
      <Specialty>Touring</Specialty>
      <SquareFeet>18000</SquareFeet>
      <Brands>4+</Brands>
      <Internet>T1</Internet>
      <NumberEmployees>14</NumberEmployees>
    </StoreSurvey>
  </Demographics>
  <ModifiedDate>2008-10-13T11:15:07.497</ModifiedDate>
</Store>

The Map code is going to extract the Sales Amount for each Business Type and Bank:

let Map (element:XElement) =

    let aw = "http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"

    let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))

    if not(demographics = null) then
        let business = demographics.Element(XName.Get("BusinessType", aw)).Value
        let bank = demographics.Element(XName.Get("BankName", aw)).Value
        let key = (business, bank)

        let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value)

        Seq.singleton (key, sales)
    else
        Seq.empty

The Reduce then sums the Sales across each of the Business Types and Banks:

let Reduce (key:(string*string)) (values:seq<string>) =
    let totalRevenue =
        values |>
        Seq.fold (fun revenue value -> revenue + Int32.Parse(value)) 0

    Some(totalRevenue)

The rationale for the Map returning a sequence rather than a single value as the Map calling code allows for Map function to return multiples values per XML node.

All simple processing. So how are the Map and Reduce functions called?

Mapper and Reducer Executable

The complexity in the Mapper executable, which calls the Map function, comes in processing the XML input stream. Whereas in text streaming one gets a line per record to be processed, in the case of XML this is not the case. For XML Streaming one will get a stream of XML which will more than likely be presented over multiple lines. As such, the Mapper executable will need to parse the input, in my case line by line, and extract the required nodes for the construction of the XElement type. The requirement here then is not that different from the Java XML Input Format class.

In the sample code the construction of the XElement sequence is achieved through the XMLElements sequence definition. The full code listing is as follows:

module Controller =

    let Run (args:string array) =    
    
        // Define what arguments are expected
        let defs = [
            {ArgInfo.Command="input"; Description="Input XML File"; Required=false };
            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs        
        
        // Ensure Standard Input/Output and allow for debug configuration
        use reader =
            if parsedArgs.ContainsKey("input") then
                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
            else
                new StreamReader(Console.OpenStandardInput())

        use writer =
            if parsedArgs.ContainsKey("output") then
                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
            else
                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)

        // Combine the name/value output into a string
        let outputCollector ((outputKey1, outputKey2) , outputValue) =
            let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue
            writer.WriteLine(output)

        // Write an counter entry
        let counterReporter docType =
            stderr.WriteLine (sprintf "reporter:counter:Elements Processed,%s,1" docType)

        let nodename = StoreXmlElementMapper.MapNode
        let nodestart = "<" + nodename + ">"
        let nodeend = "</" + nodename + ">"

        // Read the next line of the input stream
        let readLine() =
            reader.ReadLine()      

        // Parse the input stream into a sequence of XElement types
        let elementBuilder = new StringBuilder(1024)
        let rec xmlElements inContent (someContent:string option) = seq {
            let line =
                match someContent with
                | Some(content) -> content
                | None -> readLine()

            if not (box line = null) then
                if (inContent) then
                    // Try to find the end element and yield accordingly
                    let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)
                    if (offset > -1) then
                        // Found the endnode so append and start new XElement
                        let content = line.Substring(0, offset + nodeend.Length)
                        elementBuilder.Append(content) |> ignore
                        let nextContent =                             
                            if (offset + nodeend.Length = line.Length) then
                                None
                            else
                                Some(line.Substring(offset + nodeend.Length))
                        yield XElement.Parse(elementBuilder.ToString())
                        elementBuilder.Clear() |> ignore
                        yield! xmlElements false nextContent
                    else
                        // Just a content line so append
                        elementBuilder.AppendLine(line) |> ignore
                        yield! xmlElements true None
                else
                    // Find the start node element and start building
                    let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)
                    if (offset > -1) then
                        yield! xmlElements true (Some(line.Substring(offset)))
                    else
                        yield! xmlElements false None            
        }        
        
        // Process the XElement sequence and report on successes
        let elementProcessed value =
            outputCollector value
            counterReporter "Successfully Processed"

        try
            xmlElements false None
            |> Seq.map StoreXmlElementMapper.Map
            |> Seq.iter (Seq.iter elementProcessed)
        with
        | :? System.Xml.XmlException ->
            // Ignore invalid xml elements but report on count
            counterReporter "Invalid Elements"

        // Close the streams
        reader.Close()
        writer.Close()

The premise of the creation of the sequence of XElement types is that a StringBuilder is used to build up the text used to create the XElement. The opening tag is located after which any content is appended to the StringBuilder. Once the end tag is located and appended to the StringBuilder, its contents are yielded as an XElement, and the process repeated. One has to remember that after finding the end tag the remainder of the line needs to be fed into the locate for the next opening tag.

As this processing is dependant on knowing the element name to process I decided to have the module containing the Map function return the node name. Other methods could be used but this seemed to be the cleanest as the Mapper needs this understanding to process the XML.

In addition to handling the XML processing this sample code also demonstrates writing out the composite key from the tuple and generating counters showing the number of elements processed; both success and failures.

With the XML previously mentioned the output from the Mapper executable will be:

BM    Guardian Bank    1100000
BM    International Bank    3000000
BM    International Security    2700000
BM    Primary Bank & Reserve    2200000
BM    Primary International    1100000
BM    Reserve Security    1100000
BM    United Security    2900000
BS    International Security    1500000
BS    Primary Bank & Reserve    1500000
BS    Reserve Security    800000
OS    Guardian Bank    6000000
OS    International Bank    4500000
OS    International Security    4500000
OS    Primary Bank & Reserve    10500000
OS    Primary International    6000000
OS    Reserve Security    6000000
OS    United Security    4500000

So onto the Reducer. The Reducer, as in the previous samples, just has to read in this text stream and call the Reduce function. The full code listing is as follows:

module Controller =

    let Run (args:string array) =

        // Define what arguments are expected
        let defs = [
            {ArgInfo.Command="input"; Description="Input File"; Required=false };
            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs

        // Ensure Standard Input/Output and allow for debug configuration
        use reader =
            if parsedArgs.ContainsKey("input") then
                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
            else
                new StreamReader(Console.OpenStandardInput())

        use writer =
            if parsedArgs.ContainsKey("output") then
                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
            else
                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
        
        // Combine the name/value output into a string
        let outputCollector (outputKey1, outputKey2) outputValue =            
            let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue
            writer.WriteLine(output)

        // Read the next line of the input stream
        let readLine() =
            reader.ReadLine()

        // Parse the input into the required key/value pair
        let parseLine (input:string) =
            let keyValue = input.Split('\t')
            ((keyValue.[0].Trim(), keyValue.[1].Trim()), keyValue.[2].Trim())

        // Converts a input line into an option
        let getInput() =
            let input = readLine()
            if not(String.IsNullOrWhiteSpace(input)) then
                Some(parseLine input)
            else
                None

        // Creates a sequence of the input based on the provided key
        let lastInput = ref None
        let continueDo = ref false
        let inputsByKey key firstValue = seq {
            // Yield any value from previous read
            yield firstValue

            continueDo := true
            while !continueDo do                
                match getInput() with
                | Some(input) when (fst input) = key ->
                    // Yield found value and remainder of sequence
                    yield (snd input)                    
                | Some(input) ->
                    // Have a value but different key
                    lastInput := Some(input)
                    continueDo := false
                | None ->
                    // Have no more entries
                    lastInput := None
                    continueDo := false
        }

        // Controls the calling of the reducer
        let rec processInput (input:((string * string) * string) option) =
            if input.IsSome then
                let (key, value) = input.Value
                let reduced = StoreXmlElementReducer.Reduce key (inputsByKey key value)
                if reduced.IsSome then
                    outputCollector key reduced.Value
                processInput lastInput.contents

        processInput (getInput())

The main difference in this code is the support for a tuple as a key. As in previous samples the Reducer has to create a sequence of sequences, as lazy evaluated groupBy, of the input data based on the key values. In this instance to find a match one needs to ensure both key elements are the same. This is achieved in F# by the fact tuple types support structural equality (check out Equality and Comparison Constraints in F# by Don Syme for more details).

Of course one could generalize this code to support N key elements; but maybe more on this at another time.

Finally onto running the job. The command to run this MapReduce job is as follows:

hadoop.cmd jar lib/hadoop-streaming-ms.jar
"-D xmlinput.element=Store" "-D stream.num.map.output.key.fields=2"
-input "/stores/demographics" -output "/stores/banking/release"
-mapper "..\..\jars\FSharp.Hadoop.MapperXml.exe"
-combiner "..\..\jars\FSharp.Hadoop.ReducerXml.exe"
-reducer "..\..\jars\FSharp.Hadoop.ReducerXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapperXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.ReducerXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapReduce.dll"
-file "C:\bin\Release\FSharp.Hadoop.Utilities.dll"
-inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat

The configuration values one has to set are the number of output key fields, “stream.num.map.output.key” with a value of 2, and the XML element name to be processed, “xmlinput.element” with a value of “Store”.

As in the Binary Document Streaming sample, I created a new Streaming JAR called “hadoop-streaming-ms.jar” which contains the new Java classes on top of the base Hadoop streaming classes. The downloadable code has the full listing of the Java classes along with a command file to compile the source code.

Tester Application

As always I have put together a tester application for calling the Mapper and Reducer executable:

module MapReduceConsole =
        
    let Run args =

        // Define what arguments are expected
        let defs = [            
            {ArgInfo.Command="input"; Description="Input File"; Required=true };
            {ArgInfo.Command="output"; Description="Output File"; Required=true };
            {ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
            {ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
            {ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true };
            {ArgInfo.Command="nodename"; Description="XML Node"; Required=true }; ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs
        Arguments.DisplayArgs parsedArgs

        // define the executables
        let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
        let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])
        let nodename = parsedArgs.["nodename"]
        let nodestart = "<" + nodename + ">"
        let nodeend = "</" + nodename + ">"

        Console.WriteLine()
        Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
        Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)
        Console.WriteLine (sprintf "Processing Nodename:\t%O" nodename)

        // Get the file names
        let inputfile = Path.GetFullPath(parsedArgs.["input"])
        let outputfile = Path.GetFullPath(parsedArgs.["output"])

        let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
        let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))

        let mappedfile = Path.ChangeExtension(tempFile, "mapped")
        let reducefile = Path.ChangeExtension(tempFile, "reduced")

        Console.WriteLine()
        Console.WriteLine (sprintf "The input file is:\t\t%O" inputfile)
        Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
        Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
        Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)

        // Give the user an option to continue
        Console.WriteLine()
        Console.WriteLine("Hit ENTER to continue...")
        Console.ReadLine() |> ignore       

        // Parse the input stream into a sequence of XElement types


        let mapperProcess() =
            use mapper = new Process()
            mapper.StartInfo.FileName <- mapperExe
            mapper.StartInfo.UseShellExecute <- false
            mapper.StartInfo.RedirectStandardInput <- true
            mapper.StartInfo.RedirectStandardOutput <- true
            mapper.Start() |> ignore

            use mapperInput = mapper.StandardInput
            use mapperOutput = mapper.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            Console.WriteLine "Mapper Processing Starting..."   

            let taskMapperFunc() =
                use mapperWriter = File.CreateText(mappedfile)
                while not mapperOutput.EndOfStream do
                    mapperWriter.WriteLine(mapperOutput.ReadLine())
            let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

            // Pass the file into the mapper process and close input stream when done
            use mapperReader = new StreamReader(File.OpenRead(inputfile))
            let elementBuilder = new StringBuilder(1024)

            let rec xmlElements inContent (someContent:string option) = seq {
                let line =
                    match someContent with
                    | Some(content) -> content
                    | None -> mapperReader.ReadLine()

                if not (box line = null) then
                    if (inContent) then
                        // Try to find the end element and yield accordingly
                        let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)
                        if (offset > -1) then
                            // Found the endnode so always add a new line
                            let content = line.Substring(0, offset + nodeend.Length)
                            elementBuilder.AppendLine(content) |> ignore
                            let nextContent =
                                if (offset + nodeend.Length = line.Length) then
                                    None
                                else
                                    Some(line.Substring(offset + nodeend.Length))
                            yield elementBuilder.ToString()
                            elementBuilder.Clear() |> ignore
                            yield! xmlElements false nextContent
                        else
                            // Just a content line so append
                            elementBuilder.AppendLine(line) |> ignore
                            yield! xmlElements true None
                    else
                        // Find the start node element and start building
                        let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)
                        if (offset > -1) then
                            yield! xmlElements true (Some(line.Substring(offset)))
                        else
                            yield! xmlElements false None
            }

            xmlElements false None
            |> Seq.iter mapperInput.WriteLine

            mapperInput.Close()
            taskMapperWriting.Wait()
            mapperOutput.Close()

            mapper.WaitForExit()
            let result = match mapper.ExitCode with | 0 -> true | _ -> false

            mapper.Close()
            result

        // Sort the mapped file by the first field - mimic the role of Hadoop
        let hadoopProcess() =
            Console.WriteLine "Hadoop Processing Starting..."

            let unsortedValues = seq {
                use reader = new StreamReader(File.OpenRead(mappedfile))
                while not reader.EndOfStream do
                    let input = reader.ReadLine()
                    let keyValue = input.Split('\t')
                    yield (keyValue.[0].Trim(), keyValue.[1].Trim(), keyValue.[2].Trim())
                reader.Close()
                }

            use writer = File.CreateText(reducefile)
            unsortedValues
            |> Seq.sortBy (fun (key1, key2, value) -> (key1, key2))
            |> Seq.iter (fun (key1, key2, value) -> writer.WriteLine (sprintf "%O\t%O\t%O" key1 key2 value))
            writer.Close()

        
        // Finally call the reducer process
        let reducerProcess() =
��           use reducer = new Process()
            reducer.StartInfo.FileName <- reducerExe
            reducer.StartInfo.UseShellExecute <- false
            reducer.StartInfo.RedirectStandardInput <- true
            reducer.StartInfo.RedirectStandardOutput <- true
            reducer.Start() |> ignore

            use reducerInput = reducer.StandardInput
            use reducerOutput = reducer.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            Console.WriteLine "Reducer Processing Starting..."

            let taskReducerFunc() =
                use reducerWriter = File.CreateText(outputfile)
                while not reducerOutput.EndOfStream do
                    reducerWriter.WriteLine(reducerOutput.ReadLine())
            let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))

            // Pass the file into the mapper process and close input stream when done
            use reducerReader = new StreamReader(File.OpenRead(reducefile))
            while not reducerReader.EndOfStream do
                reducerInput.WriteLine(reducerReader.ReadLine())

            reducerInput.Close()
            taskReducerWriting.Wait()
            reducerOutput.Close()

            reducer.WaitForExit()
            let result = match reducer.ExitCode with | 0 -> true | _ -> false

            reducer.Close()
            result


        // Finish test
        if mapperProcess() then
            Console.WriteLine "Mapper Processing Complete..."  

            hadoopProcess()
            Console.WriteLine "Hadoop Processing Complete..."

            if reducerProcess() then
                Console.WriteLine "Reducer Processing Complete..."

                Console.WriteLine "Processing Complete..."     
                   
        Console.ReadLine() |> ignore

Again, the challenge in this code is the processing of the XML. This sample code processes a single XML input file, performing the parsing of the XML into the required elements, and then streaming this into the Mapper executable. If you review the code you will see a string similarity between the Mapper executable XElement sequence generation and how the XML is parsed and streamed into the Mapper.

Finally, a required argument into the tester application is the “nodename” value. This is analogous to the job configuration parameter when running the job within a Hadoop cluster.

Conclusion

So this wraps up Streaming examples for a while. I have enjoyed putting it all together, but I hope it is some useful code.