Carl Nolan’s ramblings on development
So, to round out the Hadoop Streaming samples I thought I would put together an XML Streaming sample. As always the code can be found here:
http://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850
So how does one stream in XML? If you read the Hadoop Streaming documentation you will notice the following FAQ:
You can use the record reader StreamXmlRecordReader to process XML documents.
hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)
Anything found between BEGIN_STRING and END_STRING would be treated as one record for map tasks.
This should allow one to define a start and end tag using commands such as:
-inputreader "StreamXmlRecord,begin=<Row>,end=</Row>"
However, if one tries to run this from the command console one gets an error due to the “<” character; as this is used to redirect standard input. It seems that using the usual caret escape character ^ is not enough.
In this example, to parse the XML one could just specify the XML Element name, “Row” in this case, and the start and end tags could easily be derived. As such to assist in XML processing I have provided an XML reader to do exactly this. I was going to modify the base Hadoop stream reader but it seems that there is traction behind the usage of a Mahout library, called “org.apache.mahout.classifier.bayes.XmlInputFormat”.
I have changed this base library to allow a configuration value of “"xmlinput.element" that allows one to just define the XML Element to be processed. This allows one to support an XML Streaming job used with the following configuration:
"-D xmlinput.element=Store" -inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat
As a note one has to remember to place the job configuration setting in quotes.
The changes to support this are quite minimal. Firstly the Input Format is defined as:
Secondly the Record Reader is modified to define the start and end tags based on the new configuration value:
In using this code one has the restriction that the element tag being searched for is well-formed and has no corresponding attributes. If this is not the case one can easily create a new XML Input Format class specific to your needs.
The complete original source can be found in various places; including github. My modifications are in the source code download.
With the new XML Input Format class in place we are good to go with our F# code.
As always, lets start with Map and Reduce functions. The goal of the sample code is to support a Map and Reduce with the following prototypes:
Map : XElement –> seq<(string * string) * obj>
Reduce : string * string -> seq<string> -> obj option
The idea is the Mapper takes in an XElement and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key and value pair and returns an optional reduced value.
The main difference that you will notice in this sample is that the key is a tuple, which in turn is mapped into multiple map keys. This is supported in Streaming application through the configuration value “stream.num.map.output.key.fields”. The Map in the previous samples returns a single string key value.
For the sample I was going to process a series of XML nodes with the following structure (created by running a TSQL Select from the Adventure Works Store table):
The Map code is going to extract the Sales Amount for each Business Type and Bank:
The Reduce then sums the Sales across each of the Business Types and Banks:
The rationale for the Map returning a sequence rather than a single value as the Map calling code allows for Map function to return multiples values per XML node.
All simple processing. So how are the Map and Reduce functions called?
The complexity in the Mapper executable, which calls the Map function, comes in processing the XML input stream. Whereas in text streaming one gets a line per record to be processed, in the case of XML this is not the case. For XML Streaming one will get a stream of XML which will more than likely be presented over multiple lines. As such, the Mapper executable will need to parse the input, in my case line by line, and extract the required nodes for the construction of the XElement type. The requirement here then is not that different from the Java XML Input Format class.
In the sample code the construction of the XElement sequence is achieved through the XMLElements sequence definition. The full code listing is as follows:
The premise of the creation of the sequence of XElement types is that a StringBuilder is used to build up the text used to create the XElement. The opening tag is located after which any content is appended to the StringBuilder. Once the end tag is located and appended to the StringBuilder, its contents are yielded as an XElement, and the process repeated. One has to remember that after finding the end tag the remainder of the line needs to be fed into the locate for the next opening tag.
As this processing is dependant on knowing the element name to process I decided to have the module containing the Map function return the node name. Other methods could be used but this seemed to be the cleanest as the Mapper needs this understanding to process the XML.
In addition to handling the XML processing this sample code also demonstrates writing out the composite key from the tuple and generating counters showing the number of elements processed; both success and failures.
With the XML previously mentioned the output from the Mapper executable will be:
BM Guardian Bank 1100000 BM International Bank 3000000 BM International Security 2700000 BM Primary Bank & Reserve 2200000 BM Primary International 1100000 BM Reserve Security 1100000 BM United Security 2900000 BS International Security 1500000 BS Primary Bank & Reserve 1500000 BS Reserve Security 800000 OS Guardian Bank 6000000 OS International Bank 4500000 OS International Security 4500000 OS Primary Bank & Reserve 10500000 OS Primary International 6000000 OS Reserve Security 6000000 OS United Security 4500000
So onto the Reducer. The Reducer, as in the previous samples, just has to read in this text stream and call the Reduce function. The full code listing is as follows:
The main difference in this code is the support for a tuple as a key. As in previous samples the Reducer has to create a sequence of sequences, as lazy evaluated groupBy, of the input data based on the key values. In this instance to find a match one needs to ensure both key elements are the same. This is achieved in F# by the fact tuple types support structural equality (check out Equality and Comparison Constraints in F# by Don Syme for more details).
Of course one could generalize this code to support N key elements; but maybe more on this at another time.
Finally onto running the job. The command to run this MapReduce job is as follows:
The configuration values one has to set are the number of output key fields, “stream.num.map.output.key” with a value of 2, and the XML element name to be processed, “xmlinput.element” with a value of “Store”.
As in the Binary Document Streaming sample, I created a new Streaming JAR called “hadoop-streaming-ms.jar” which contains the new Java classes on top of the base Hadoop streaming classes. The downloadable code has the full listing of the Java classes along with a command file to compile the source code.
As always I have put together a tester application for calling the Mapper and Reducer executable:
Again, the challenge in this code is the processing of the XML. This sample code processes a single XML input file, performing the parsing of the XML into the required elements, and then streaming this into the Mapper executable. If you review the code you will see a string similarity between the Mapper executable XElement sequence generation and how the XML is parsed and streamed into the Mapper.
Finally, a required argument into the tester application is the “nodename” value. This is analogous to the job configuration parameter when running the job within a Hadoop cluster.
So this wraps up Streaming examples for a while. I have enjoyed putting it all together, but I hope it is some useful code.