Carl Nolan’s ramblings on development
I have often been asked how does one implement a Join whilst writing MapReduce code. As such, I thought it would be useful to add an additional sample demonstrating how this is achieved. There are multiple mechanisms one can employ to perform a Join operation, and the one to be discussed will be a Reduce Side 1-to-many join.
As always this sample, amongst others, can be found in the “Generics based Framework for .Net Hadoop MapReduce Job Submission” code download; within the Samples folder.
Say one wants to join two sets of data, A and B, via a common set attribute. Set A could be defined as a collection of tuples of the form (ki, ai, Ai), where k represents the key value on which we want to do the join, s the set item unique identifier, and S the other attributes. For this set each k value would correspond to a unique value of a.
Set B would be similarly defined as a collection of tuples of the form (ki, bx, Bx). In this case each value of k would equate to multiple values of b.
As an example, set A could be thus represented by an OrderHeader type and B the corresponding OrderDetail types. The k and a values in this case would represent the sales order identifier. The b value would represent the sales order detail identifier.
The basic concept is that the MapReduce job will create a new set C, which would be defined by the tuple collection (ki, ax, by, Ax, By). For each key identifier k, there would be a single value of a, and multiple values for b.
(k64, a52, b106, A52, B106) (k64, a52, b121, A52, B121) … (k64, a52, b234, A52, B234)
Furthering the sample this new set C could be represented by a new type being the aggregate of the OrderHeader and OrderDetail types; say OrderLine. The SalesOrderId would be the common key attribute.
To perform the Reduce Side join the Mapper would read in the data representing both the sets of data. When processing set A data the emitted value would be (ki, ax, Ax, Ø, Ø), and for set B the value (ki, Ø, Ø, by, By). In both cases the emitted key would be common key attribute value.
The Reducer would receive the set values for each shared key attribute; namely a single (ki, ax, Ax, Ø, Ø) value, and multiple (ki, Ø, Ø, by, By) values. It would then emit the full set of (ki, ax, by, Ax, By) values; for each key attribute.
So as an example, lets do a join between order header and detail information, from the SQL Server AdventureWorks sample database. The sample download also includes a BCP scripts to extract sales data from the sample database. For completeness I have also written the sample in both C# (with heavy use of LINQ) and F#.
To start we need to define types that represent the sales header and detail information. These type definitions, listed below, are OrderHeader and OrderDetail. As mentioned we also need an aggregate type, called OrderLine, that aggregates both the header and detail types.
The OrderLine aggregated type is used as the output from the Mapper and the Reducer.
The purpose of the mapper is to read in both the header and detail data. For each data record the mapper outputs a value of an OrderLine instance, along with a key value of the SalesOrderID value. When processing a sales header data item an OrderLine is output with a null value for the OrderDetail; and of course vice-versa when processing a detail data item.
The code for parsing the data items is as below. In this instance I have made the determination of the data item type by inspecting the second data value. In this case I am looking for an OrderDate; however one can adopt many different approaches based on the input data.
Literally that is it for the Mapper. It determines the type of input line and emits an OrderLine item.
As you can see the key is the SalesOrderID value. More on this later, when talking about a performance optimization.
The purpose of the Reducer is to locate the OrderHeader value for each SalesOrderID key value, and for each corresponding OrderDetail value output a complete OrderLine; consisting of the located header value and the detail values.
In this version of the code, again shown below, use is made of a List of OrderDetail items. Once all the values have been read then the OrderLine sequence of values are returned.
As you can see this code has a memory limitation. For each SalesOrderID value all the corresponding detail values need to be cached in memory, as no determination can be made as to when the header value is located. This may not be an issue with a small number of details associated to a header, but what about data where this number can be extremely large.
This limitation however is easily overcome through the use of a secondary sort. Basically two key values are used to ensure the header value always arrives first within the Reducer.
When processing the order data for a particular key within the reducer, if one knows that the first value will always be the header data then this can be saved for detail data processing. Thus when reading the subsequent detail lines the aggregated OrderLine instance values can be emitted directly without the intermediate List processing step. This is where a secondary sort comes into play.
If one uses two keys rather than one, it is possible to sort the data such that the first value, for each SalesOrderID, will be the header data. This can be achieved in our case by using a secondary sort key of the SalesOrderDetailID. For the header one can then just use a value of zero to ensure it is the first value; as all details items have a positive value. Of course, different approaches can be taken depending on the data domain.
Although the data is sorted on two separate keys, the partitioning must be such that the data for each SalesOrderID is sent to a single Reducer; spanning multiple SalesOrderDetailID values.
Using this approach here is the code for the modified Mapper logic:
The multiple keys values are delimitated with a Tab value.
The Reducer code then becomes a lot simpler as no List caching is involved; and where LINQ enables succinct C# code.
As you can see the code is probably simpler, especially when it comes to the Reducer, and is also more efficient than the In-Memory approach.
For the In-Memory code the job submission is quite simple:
%BASEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe -input "join/data" -output "join/order" -mapper "MSDN.Hadoop.MapReduceFSharp.OrderJoinMemoryMapper, MSDN.Hadoop.MapReduceFSharp" -reducer "MSDN.Hadoop.MapReduceFSharp.OrderJoinMemoryReducer, MSDN.Hadoop.MapReduceFSharp" -file "%BASEPATH%\MSDN.Hadoop.MapReduceFSharp\Release\MSDN.Hadoop.MapReduceFSharp.dll"
However, for the optimized version one needs to accommodate the fact that the sorting takes place on two keys, and the data partitioning only on one. This is achieved by setting these exact options:
%BASEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe -input "join/data" -output "join/order" -mapper "MSDN.Hadoop.MapReduceFSharp.OrderJoinMapper, MSDN.Hadoop.MapReduceFSharp" -reducer "MSDN.Hadoop.MapReduceFSharp.OrderJoinReducer, MSDN.Hadoop.MapReduceFSharp" -file "%BASEPATH%\MSDN.Hadoop.MapReduceFSharp\Release\MSDN.Hadoop.MapReduceFSharp.dll" -numberKeys 2 -numberPartitionKeys 1
The reason these options were added to version 1.0.0 of the code was to accommodate this exact type of processing.
To conclude here is some sample output from the join.
Utilizing the output is merely a case of serializing the data back into the OrderLine type.
Hopefully I have demonstrated that MapReduce joins are not that difficult to perform. With a little thought around the types, the code is not much more than data parsing.
A word of caution, compared to standard database joins, MapReduce joins are slow. If you can join the data before running MapReduce processing then this should be your preferred approach.
It's nice article .