As mentioned in my previous post Hadoop Streaming not only supports text streaming, but it also supports Binary Streaming. As such I wanted to put together a sample that supports processing Office documents. As before the code can be downloaded from:
Putting together this sample involved a bit more work than the text streaming case as one needed to put together an implementation of the Java classes to support binary streaming; namely FileInputFormat and RecordReader. The purpose of these implementations is to support Binary Streaming of the document such that it is not split on the usual line boundaries. More on the Java code later.
The implemented Java classes are written, with a key value type pairing of <Text, BytesWritable>, which writes out the data for the mapper in the following format:
The Mapper code will get called with this format for each document in the specified input directories.
The goal of the sample code is to support a Map and Reduce with the following prototypes:
Map : WordprocessingDocument –> seq<string * obj>
Reduce : string -> seq<string> -> obj option
The idea is the Mapper takes in a Word document and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key value pair and returns an optional reduced value.
The use of the obj type is, as in the text streaming case, to support serializing the output values.
As an example, I have put together a MapReduce to process Word documents, where the word document is mapped into the number of pages per author and where multiple authors are credited with the same number of pages.
The sample Mapper code is as follows:
As you can see the majority code is needed to pull out the document properties. If one wanted to process the words within the document one would use the following code:
To run this code it is worth noting there is a dependency on installing the Open XML SDK 2.0 for Microsoft Office.
Finally, the Reducer code is as follows:
Again, as in the text streaming application, both the mapper and the reducer are executables that read the input from StdIn and emit the output to StdOut. In the case of the mapper the console application will need to do multiple Console.ReadByte() calls to get the data, and then perform a Console.WriteLine() to emit the output. The reducer will do a Console.ReadLine() to get the data, and perform a Console.WriteLine() to emit the output.
The previous post covers the schematics of creating console applications for F#; so I will not cover this again but assume the same program structure.
As previously mentioned the purpose of the Mapper code is to perform Input Format Parsing, Projection, and Filtering. In the Mapper for a Word document the bytes are used to create a WordprocessingDocument, with invalid documents ignored, these are then projected into a key/value sequence using the Map function:
There are a few things of note in the code.
When pulling out the filename of the document that is being processed, it is assumed that UTF8 encoding has been used and that a Tab character is used as a delimiter between the filename and the documents bytes.
In building the Stream that is to be used for creating the WordprocessingDocument a MemoryStream is used. There are several reasons for this. Firstly one needs to remove the last Newline character from the Stream, and secondly when creating a WordprocessingDocument a Stream is needed that supports Seek operations; unfortunately this is not the case for StdIn.
At the moment the code does not use the Filename. However in future posts I will extend the code to also support processing PDF documents.
After running the Mapper, the data being parsed into the Reducer will be a key/value pair delimited with a Tab. Using the above Map, a sample input dataset for the Reducer would be:
Brad Sarsfield 44
Carl Nolan 1 Marie West 1
Carl Nolan 9
Thus in this case, the code for the Reducer will be the same as in the text streaming case:
Once run, the reduced output would be:
Brad Sarsfield 44
Carl Nolan 10
Marie West 1
Once again the only complexity in the code is the fact the Seq.groupBy function cannot be used. Also, as in the text streaming case there is a fair amount of code controlling the input and output streams for calling the Map and Reduce functions, that can be reused for all Hadoop Binary Streaming jobs.
In the case of Binary Streaming the command parameters are a little different to the text streaming case:
The first difference is the use of the hadoop-streaming-ms.jar. This file contains the InputFormat class specified by the inputformat parameter; the later Java Classes section discusses how this is created. This is needed to support Binary Streaming.
One other difference to my previous text streaming case is the use of the Reducer class as a Combiner. As the output types from the mapper are the same as the reducer then this is possible.
For completeness I have included the base code for the tester application; albeit the full code is included in the download. The code is very similar to the tester application mentioned in my previous post, with the exception of how the mapper executable is called:
When calling the mapper it is no longer the case that the mapper executable is called once, with each line being redirected into the executables StdIn. In the binary case the mapper executable is called once for each document, where for each document the data is then redirected into the executables StdIn. As mentioned the format defined is the filename, delimitated with a Tab, followed by the documents bytes, and finally the Newline character:
This code is code for each document found in the directory specified in the input argument.
The previous post discussed the threading and stream processing needed for testing in a little more detail.
To complete the post here is the full listing for the Java class implementations:
Once the Java classes have been compiled they need to be merged into a copy of the hadoop-streaming.jar file. In my case I have created a file called hadoop-streaming-ms.jar. This file is a copy of the base file into which I have copied the classes, as the JAR file is just a ZIP file; although one can also use the JAR tool. One just has to remember to use the package path:
To use this new streaming package the file needs to be copied to the Hadoop install lib directory; in my case:
If you download the code there is also an implementation of these classes that support a key value type pairing of <NullWritable, BytesWritable>; rather than the <Text, BytesWritable> key value type pairing that is used to pass in the documents filename. This can used used if the document’s filename is not needed.
Hopefully the code is once again useful if you intend to write any Binary Streaming applications of documents. If you download the sample code, in addition to support for processing of Microsoft Word documents there is support for processing PDF documents.
Written by Carl Nolan