In a previous post I covered the basics for a Co-occurrence Approach to an Item Based Recommender. As promised, here is the continuation of this work, an implementation of the same algorithm using MapReduce. Before reading this post it will be worth reading the Local version as it covers the sample data and general co-occurrence concepts. Also, the MapReduce example will use the same data as the Local based approach and generate the same recommendations.

As always the complete Local and MapReduce code can be downloaded from:

http://code.msdn.microsoft.com/Co-occurrence-Approach-to-57027db7

As a recap, the approach taken for the item-based recommender will be to define a co-occurrence Matrix based on purchased items; products purchased for an order. The MapReduce variant, rather than creating a Matrix, will create a series of Sparse Vectors, so once again I will be using the Math.Net Numerics libraries. The actual Mapper and Reducer types will be written in F# and the job submitted using the “Generics based Framework for Composing and Submitting .Net Hadoop MapReduce Jobs”.

The MapReduce Approach

The approach one will have to take with MapReduce is a little different to the Local implementation. The objective of the MapReduce phases will be to construct a series of Sparse Vectors, where each Sparse Vector represents the co-occurrence recommendation values for a single product. One can think of this as the rows of a Sparse Matrix  but constructed independently and possibly output across several files.

To perform the calculation one will have to run two consecutive jobs. The first MapReduce job will take in the order detail lines and for each order output the list of products, with an associated co-occurrence quantity. It is from this data that one can construct the co-occurrence product pairs and hence the necessary vector values. 

The second MapReduce job will use as input the output from the previous job. The Map phase will take each order and corresponding product lists, emitting the co-occurrence product and quantity pairs. The Reduce phase then constructs the Sparse Vectors for each product.

To cover these processes the following F# Record definitions will be required:

type OrderDetail = { OrderId:int; OrderDate:DateTime; ProductId:int; OrderQty:int}

type ProductQuantity = { ProductId:int; Quantity:float}

type ProductQuantityList() =
    inherit List<ProductQuantity>()

type ProductRecommendations = {ProductId:int; Offset:int; Recommendations:SparseVector}

Using these type definitions the MapReduce job phases can be outlined as follows:

Stage In Key In Type Out Key Out Type
Map 1   OrderDetail Order Id ProductQuantity
Reduce 1 Order Id ProductQuantity Order Id ProductQuantityList
Map 2 (Order Id) ProductQuantityList Product Id ProductQuantity
Reduce 2 Product Id ProductQuantity Product Id ProductRecommendations

However, one has to remember that the actual input/output types are actually sequences (or IEnumerables in C#), of the specified types. Also, the key into the second mapper is actually derived from the input data as the first tab separated field, the data being the output from the previous MapReduce job.

This is the basic version of the type mappings, but for both Map stages there are optimizations one can take. Also, for the second MapReduce job a Combiner can be used that may help performance. 

Note: The ProductQuantityList types exists solely to support Json Serialization, which is now used as the serialization format for all data in and out of the MapReduce jobs.

Order MapReduce Phase

The first, order processing, MapReduce job will take in the order detail lines and output, for each order, the list of products with an associated co-occurrence quantity. The main purpose of the Map phase will be to strip down the data into an order identifier and a ProductQuantity. In this case the quantity being emitted is the value adjusted for recent orders; as in the Local case. The Reduce phase just outputs the input sequence.

The core sequence code for this base Mapper would be:

seq {
    let orderLine = Helpers.ParseInputData value
    if orderLine.IsSome then
        let order = orderLine.Value
        let product = {
            ProductQuantity.ProductId = order.ProductId;
            Quantity = (min (float order.OrderQty) qtyMaximum) * (orderFactor order)
            }
        yield (getOrderKey order.OrderId, product)
}

However, if one takes this approach one will not be taking advantage of the fact that the input data is sorted on the order identifier. If one assumes this, an optimization that one can easily take is rather than just emit a single ProductQuantity, emit the list of all the products for the each order. This would reduce the volume of output data and hence the work for the Shuffle and Sort phase. Of course for the case where the data is not sorted, or split across mappers, the Reducer will do the final aggregation of the data.

In this optimized version the types mapping becomes:

Stage In Key In Type Out Key Out Type
Map   OrderDetail Order Id ProductQuantityList
Reduce Order Id ProductQuantityList Order Id ProductQuantityList

This leads to a full Mapper code listing of the following:

Order Mapper
  1. type OrderVectorMapper() =
  2.     inherit MapperBaseText<ProductQuantityList>()
  3.  
  4.     // Configuration values
  5.     let qtyMaximum = 5.0                            // Maximum rating contribution for an item
  6.     let recentFactor = 2.0                          // Quantity increase factor for recent items
  7.     let baseDate = DateTime.Today.AddMonths(-3)     // Date for a recent item
  8.  
  9.     let products = ProductQuantityList()
  10.     let mutable currentOrder = None
  11.  
  12.     // Converts an order Id to a string key
  13.     let getOrderKey (orderId:int) =
  14.         sprintf "%i" orderId
  15.  
  16.     /// Map the data from input name/value to output name/value
  17.     override self.Map (value:string) =
  18.  
  19.         // Adds a quantity factor based on recent orders
  20.         let inline orderFactor (order:OrderDetail) =
  21.             if DateTime.Compare(order.OrderDate, baseDate) > 0 then
  22.                 recentFactor
  23.             else
  24.                 1.0
  25.  
  26.         // Process the order
  27.         let orderLine =
  28.             try
  29.                 Some(Helpers.ParseInputData value)
  30.             with
  31.             | :? System.ArgumentException -> None
  32.  
  33.         seq {
  34.             if orderLine.IsSome then
  35.                 let order = orderLine.Value
  36.                 let product = {
  37.                     ProductQuantity.ProductId = order.ProductId;
  38.                     Quantity = (min (float order.OrderQty) qtyMaximum) * (orderFactor order)}
  39.                 if currentOrder.IsSome && not (order.OrderId = currentOrder.Value) then
  40.                     yield (getOrderKey currentOrder.Value, products)
  41.                     products.Clear()  
  42.                 currentOrder <- Some(order.OrderId)
  43.                 products.Add product
  44.             else
  45.                 Context.IncrementCounter("ORDERS", "Skipped Lines")
  46.             }
  47.  
  48.         /// Output remaining Map items
  49.         override self.Cleanup() = seq {
  50.             if currentOrder.IsSome then
  51.                 yield (getOrderKey currentOrder.Value, products)
  52.             }

To perform this optimization a ProductQuantityList is maintained and the corresponding sequence is emitted whenever the order identifier changes. The final Cleanup() step flushes any remaining values.

The calculation of the co-occurrence quantity is the same as in the Local case. The order quantity is used, capped at a maximum value. The quantity is also adjusted based on the order date. During the next MapReduce phase the final quantity is taken as the maximum of the quantity for each product.

As previously noted, in this case, the Reducer just re-emits the aggregated input data:

Order Reducer
  1. type OrderVectorReducer() =
  2.     inherit ReducerBase<ProductQuantityList, ProductQuantityList>()  
  3.  
  4.     /// Reduce the order data into a product list
  5.     override self.Reduce (key:string) (values:seq<ProductQuantityList>) =
  6.  
  7.         let products = ProductQuantityList()
  8.  
  9.         values
  10.         |> Seq.iter (Seq.iter products.Add)
  11.  
  12.         Seq.singleton (key, products)

To submit this job one would use the following command:

%HOMEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
  -input "recommendations/inputdata" -output "recommendations/workingdata"
  -mapper "MSDN.Recommender.MapReduce.OrderVectorMapper, MSDN.Recommender.MapReduce"
  -reducer "MSDN.Recommender.MapReduce.OrderVectorReducer, MSDN.Recommender.MapReduce"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.MapReduce.dll"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.dll"

The data from this MapReduce job is then fed into the next phase.

Product MapReduce Phase

The second, product processing, MapReduce job constructs the Sparse Vectors for each product identifier. The Map phase will take each order, and corresponding products and quantities, and emit the pairs of products along with the final co-occurrence quantity; being the maximum of the two possible values. The Reduce phase will sum all the co-occurrence values for each product, and construct the final product Sparse Vectors.

In this instance a Combine operation can be advantageous to performance. This will result in types mapping of:

Stage In Key In Type Out Key Out Type
Map (Order Id) ProductQuantityList Product Id ProductQuantity
Combine Product Id ProductQuantity Product Id ProductQuantity
Reduce Product Id ProductQuantity Product Id ProductRecommendations

If one takes this simple approach the base Map code would be along the lines of the following:

seq {     
    for (product1, product2) in ((deserialize value) |> pairs) do
        let qty = max product1.Quantity product2.Quantity
        yield getProductKey product1.ProductId, {ProductQuantity.ProductId = product2.ProductId; Quantity = qty}
        yield getProductKey product2.ProductId, {ProductQuantity.ProductId = product1.ProductId; Quantity = qty}
    }

However, as in the first MapReduce job, there is an optimization one can take. The basic approach is to just parse the input data from the previous job and emit each product pair without aggregating any of the data. However, there is the option for aggregating data within a Mapper; thus reducing the data that needs to be parsed to the reducer and the overhead of the Shuffle and Sort phase.

To perform this optimization a Dictionary can be used, with the Key being a Tuple of the product pair, and the Value being the calculated co-occurrence quantity. Obviously one cannot indefinitely build up this Dictionary, so the values are emitted once a predefined Dictionary size is reached. At this point the in-Mapper aggregation reinitializes the Dictionary and starts again. Once again the final Cleanup() step flushes any remaining Dictionary values.

This leads to a full Mapper code listing of:

Product Mapper
  1. type ProductVectorMapper() =
  2.     inherit MapperBaseText<ProductQuantity>()
  3.  
  4.     let maxSize = 1024*1024
  5.     let prodPairs = Dictionary<int*int, float>(maxSize)
  6.  
  7.     // Converts an order Id to a string key
  8.     let getProductKey (productId:int) =
  9.         sprintf "%i" productId
  10.  
  11.     // Adds to the table
  12.     let addRow idx qty =
  13.         if prodPairs.ContainsKey idx then
  14.             prodPairs.[idx] <- prodPairs.[idx] + qty
  15.         else
  16.             prodPairs.[idx] <- qty
  17.         ()
  18.  
  19.     // Defines a sequence of the current pairs
  20.     let currents = seq {
  21.         for item in prodPairs do
  22.             let (product1, product2) = item.Key
  23.             yield getProductKey product1, {ProductQuantity.ProductId = product2; Quantity = item.Value}
  24.         prodPairs.Clear()
  25.         }
  26.  
  27.     /// Map the data from input name/value to output name/value
  28.     override self.Map (value:string) =
  29.  
  30.         // Parses an input line of format List<ProductQuantity>
  31.         let deserialize (input:string) =    
  32.             let keyValue = input.Split('\t')
  33.             Helpers.ParseProductQuantityList (keyValue.[1].Trim())
  34.  
  35.         // calculates the pairs for an order
  36.         let rec pairs (items:List<'a>) = seq {   
  37.             let count = items.Count
  38.             match count with
  39.             | 0 | 1 -> ()
  40.             | 2 ->
  41.                 yield items.[0], items.[1]
  42.             | _ ->
  43.                 for idxOut = 0 to (count - 2) do
  44.                     for idxIn = (idxOut + 1) to (count - 1) do
  45.                         yield (items.[idxOut], items.[idxIn])
  46.             }
  47.  
  48.         // Define the sequence to return the product/product pairs information
  49.         (deserialize value)
  50.         |> pairs
  51.         |> Seq.iter (fun (product1, product2) ->
  52.             let qty = max product1.Quantity product2.Quantity
  53.             addRow (product1.ProductId, product2.ProductId) qty
  54.             addRow (product2.ProductId, product1.ProductId) qty)
  55.  
  56.         if prodPairs.Count > maxSize then
  57.             currents            
  58.         else
  59.             Seq.empty
  60.  
  61.         /// Output remaining Map items
  62.         override self.Cleanup() =
  63.             currents

A secondary optimization one can make for each Mapper is that of running a Combiner. If one has a large Dictionary object, then the need for a Combiner is diminished. However the code for such a Combiner just performs further quantity aggregations against the output for each Mapper:

Product Combiner
  1. type ProductVectorCombiner() =
  2.     inherit CombinerBase<ProductQuantity>()
  3.  
  4.     /// Combine the data from input name/value to output name/value
  5.     override self.Combine (key:string) (values:seq<ProductQuantity>) =            
  6.  
  7.         let maxSize = 100000               // Expected number of product correlations
  8.         let products = Dictionary<int, float>(maxSize)
  9.  
  10.         // Adds to the table
  11.         let addRow idx qty =
  12.             if products.ContainsKey idx then
  13.                 products.[idx] <- products.[idx] + qty
  14.             else
  15.                 products.[idx] <- qty
  16.             ()
  17.  
  18.         // Process the reducer input
  19.         values
  20.         |> Seq.iter (fun product -> addRow product.ProductId product.Quantity)
  21.  
  22.         seq {
  23.             for item in products do
  24.                 yield key, {ProductQuantity.ProductId = item.Key; Quantity = item.Value}
  25.             }

Once all the product pairs have been emitted by the Mapper, the Reducer can build a Sparse Vector for each product. This is done by aggregating all co-occurrence values as the element values of the Sparse Vector; much in the same way that the Sparse Matrix is constructed:

Product Reducer
  1. type ProductVectorReducer() =
  2.     inherit ReducerBase<ProductQuantity, ProductRecommendations>()
  3.  
  4.     /// Reduce the data from input name/value to output name/value
  5.     override self.Reduce (key:string) (values:seq<ProductQuantity>) =
  6.  
  7.         // Configuration values
  8.         let entryThreshold = 20.0                       // Minimum correlations for matrix inclusion
  9.         let matrixSize = 5000                           // Expected Correlations for Hash Table init
  10.  
  11.         let minItem = ref Int32.MaxValue
  12.         let maxItem = ref 0
  13.         let rcTable = Dictionary<int, float>(matrixSize)
  14.  
  15.         // Adds to the table
  16.         let addRow idx qty =
  17.             minItem := min idx !minItem
  18.             maxItem := max idx !maxItem
  19.             if rcTable.ContainsKey idx then
  20.                 rcTable.[idx] <- rcTable.[idx] + qty
  21.             else
  22.                 rcTable.[idx] <- qty
  23.             ()              
  24.  
  25.         // Process the reducer input
  26.         values
  27.         |> Seq.iter (fun product -> addRow product.ProductId product.Quantity)
  28.  
  29.         let offset = !minItem
  30.         let size = !maxItem + 1 - !minItem
  31.  
  32.         let items = seq {
  33.             for item in rcTable do
  34.                 if item.Value > entryThreshold then
  35.                     yield (item.Key - offset, item.Value)
  36.             }
  37.  
  38.         let recommendations = {ProductRecommendations.ProductId = Int32.Parse(key); Offset = offset; Recommendations = SparseVector.ofSeq size items}
  39.         Context.IncrementCounter("PRODUCTS", "Recommendations Written")
  40.  
  41.         Seq.singleton (key, recommendations)

To submit this job one would use the following command:

%HOMEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
  -input "recommendations/workingdata/part-0000[012356789]" -output "recommendations/outputdata"
  -mapper "MSDN.Recommender.MapReduce.ProductVectorMapper, MSDN.Recommender.MapReduce"
  -reducer "MSDN.Recommender.MapReduce.ProductVectorReducer, MSDN.Recommender.MapReduce"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.MapReduce.dll"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.dll"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MathNet.Numerics.dll"
  -file "%HOMEPATH%\MSDN.Recommender\Release\MathNet.Numerics.FSharp.dll"

If you review the previous code you will see that for each product, the Sparse Vector recommendations are accompanied with an Offset. This is the same offset as used in the Local version of the code, and represents the offset for the first product identifier. This is purely an optimization for querying the data.

The output from this job can then be saved and loaded into a Query engine to produce product recommendations.

Product Recommendations

Once the co-occurrence Sparse Vectors have been constructed they can be loaded into memory and queried in a very similar fashion to the Local case. To perform the loading, a Dictionary of objects is constructed where the Key is the product identifier and the value the ProductRecommendation type, containing the co-occurrence Sparse Vector:

Vector Builder
  1. module VectorLoader =
  2.  
  3.     // Defines a sequence of product vectors
  4.     let private vectorFile (mappedfile:string) = seq {
  5.             use reader = new StreamReader(Path.GetFullPath(mappedfile))
  6.             while not reader.EndOfStream do
  7.                 let line = reader.ReadLine()
  8.                 let keyValue = line.Split('\t')
  9.                 let (_, value) = (Int32.Parse(keyValue.[0].Trim()), keyValue.[1].Trim())
  10.                 yield (Helpers.ParseVectorData value)
  11.             reader.Close()
  12.             }
  13.  
  14.     /// Loads a collection Product Vector file
  15.     let GetProductVectors (filenames:string array) =   
  16.  
  17.         let products = ConcurrentDictionary<int, ProductRecommendations>()
  18.  
  19.         filenames
  20.         |> Array.iter (fun filename ->
  21.             vectorFile filename
  22.             |> Seq.iter (fun product -> products.TryAdd(product.ProductId, product) |> ignore))
  23.         
  24.         products
  25.         
  26.     /// Loads a single Product Vector file
  27.     let GetProductVector (filename:string) =
  28.  
  29.         let products = ConcurrentDictionary<int, ProductRecommendations>()
  30.  
  31.         (vectorFile filename)
  32.         |> Seq.iter (fun product -> products.TryAdd(product.ProductId, product) |> ignore)
  33.  
  34.         products

Once the Vectors have been loaded they can be queried in the same way as for the Local version. Basically, the Sparse Vector values for the products for which a recommendation is required are loaded into a PriorityQueue. The top X items are then de-queued and returned as the recommendations:

Vector Query
  1. type VectorQuery (filenames:string array) =
  2.  
  3.     let defaultRecommendations = 12
  4.  
  5.     let coVectors =
  6.         match filenames with
  7.         | [||] -> invalidArg "filename" "Filename cannot be an empty Array"
  8.         | [| filename |] -> VectorLoader.GetProductVector(filename)
  9.         | _ -> VectorLoader.GetProductVectors(filenames)
  10.  
  11.     let getQueueSV (products:int array) =         
  12.         // Define the priority queue and lookup table
  13.         let queue = PriorityQueue(coVectors.Count)
  14.         let lookup = HashSet(products)
  15.  
  16.         // Add the items into a priority queue
  17.         products
  18.         |> Array.iter (fun item ->      
  19.             if item >= 0 && coVectors.ContainsKey(item) then
  20.                 let product = coVectors.[item]     
  21.                 let recommendations = product.Recommendations
  22.                 seq {
  23.                     for idx = 0 to (recommendations.Count - 1) do
  24.                         let productIdx = idx + product.Offset
  25.                         if (not (lookup.Contains(productIdx))) && (recommendations.[idx] > 0.0) then
  26.                             yield KeyValuePair(recommendations.[idx], productIdx)
  27.                 }
  28.                 |> queue.Merge)
  29.         // Return the queue
  30.         queue
  31.  
  32.     let getItems (queue:PriorityQueue<float, int>) (items:int) =
  33.         let toDequeue = min items queue.Count
  34.         seq { for i in 1 .. toDequeue do yield queue.Dequeue().Value }
  35.         
  36.     new(filename:string) =
  37.         VectorQuery([| filename |])
  38.  
  39.     /// Get the requested number of Recommendations for a Product
  40.     member self.GetRecommendations(product:int, items:int) =
  41.         let queue = getQueueSV([| product |])
  42.         getItems queue items
  43.  
  44.     /// Get the requested number of Recommendations for a Product Array
  45.     member self.GetRecommendations(products:int array, items:int) =
  46.         let queue = getQueueSV(products)
  47.         getItems queue items
  48.  
  49.     /// Get the default number of Recommendations for a Product
  50.     member self.GetRecommendations(product:int) =
  51.         self.GetRecommendations(product, defaultRecommendations)
  52.  
  53.     /// Get the default number of Recommendations for a Product Array
  54.     member self.GetRecommendations(products:int array) =
  55.         self.GetRecommendations(products, defaultRecommendations)

Usage of the VectorQuery type is simply a matter of specifying which files to load and then calling GetRecommendations() function, in exactly the same way as for the Local case.

let itemQuery = VectorQuery(vectorFiles)
let recommendations = itemQuery.GetRecommendations(850, 100)

The API also supports getting recommendations for a product array; namely a shopping basket.

Conclusion

One thing that is worth pointing out to conclude these blog posts, is that I have skipped over how to manage the caching of the Sparse Matrix and Vector values. The reality is one would need to only build/load the data once and cache the results. This cached value would then be used by the recommendations API rather than the filenames. The cache would also need to allow multiple threads to access the loaded data, and also manage refreshing of the data.

As a final reminder, to install the libraries from NuGet one can use the Manage NuGet Packages browser, or run these commands in the Package Manager Console:

Install-Package MathNet.Numerics
Install-Package MathNet.Numerics.FSharp

To conclude these posts, during the coming week, I will also implement the C# version of the code.