For a while I thought I would tackle the problem of creating an item-based recommender. Firstly I will start with a local variant before moving onto a MapReduce version. The current version of the code can be found at:

http://code.msdn.microsoft.com/Co-occurrence-Approach-to-57027db7

The approach taken for the item-based recommender will be to define a co-occurrence matrix based on purchased items; products purchased on an order. As an example I will use the Sales data from the Adventure Works SQL Server sample database.. The algorithm/approach is best explained, and is implemented, using a matrix.

For the matrix implementation I will be using the Math.Net Numerics libraries. To install the libraries from NuGet one can use the Manage NuGet Packages browser, or run these commands in the Package Manager Console:

Install-Package MathNet.Numerics
Install-Package MathNet.Numerics.FSharp

### Co-occurrence Sample

 Order ID Product ID1 Product ID2 Product ID3 500001 1001 1003 1005 500002 1001 1002 1003 500003 1002 1003 500004 1003 1004 1005 500005 1003 1005 500006 1003 1004 500007 1002 1003 500008 1001 1003 1004 500009 1003 1004 1005

If you look at the orders containing products 1002, you will see that there is 1 occurrence of product 1001 and 3 occurrences of product 1003. From this, one can deduce that if someone purchases product 1002 then it is likely that they will want to purchase 1003, and to a lesser extent product 1001. This concept forms the crux of the item-based recommender.

So if we computed the co-occurrence for every pair of products and placed them into a square matrix we would get the following:

 Product ID 1001 1002 1003 1004 1005 1001 1 3 1 1 1002 1 3 1003 3 3 4 4 1004 1 4 2 1005 1 4 2

There are a few things we can say about such a matrix, other than it will be square. More than likely, for a large product set the matrix will be sparse, where the number of rows and columns is equal to the number of items. Also, each row (and column as the matrix is symmetric) expresses similarities between an item and all others.

Due to the diagonal symmetric nature of the matrix (axy = ayx) one can think of the rows and columns as vectors, where similarity between items X and Y is the same as the similarity between items Y and X.

The algorithm/approach I will be outlining will essentially be a means to build and query this co-occurrence matrix. Co-occurrence is like similarity, the more two items occur together (in this case in a shopping basket), the more they are probably related.

### Working Data

Before talking about the code a word is warranted on how I have created the sample data.

As mentioned before I am using the Adventure Works database sales information. The approach I have taken is to export the sales detail lines, ordered by the sales order identifier, into flat files. The rationale for this being that the processing of the matrix can then occur with a low impact to the OLTP system. Also, the code will support the parallel processing of multiple files. Thus one can take the approach of just exporting more recent data and using the previously exported archived data to generate the matrix.

To support this process I have created a simple view for exporting the data:

CREATE VIEW [Sales].[vSalesSummary]
AS
SELECT SOH.[SalesOrderID], CAST(SOH.[OrderDate] AS date) [OrderDate], SOD.[ProductID], SOD.[OrderQty]
INNER JOIN [Sales].[SalesOrderDetail] SOD ON SOH.[SalesOrderID] = SOD.[SalesOrderID]
GO

I have then used the BCP command to export the data into a Tab delimited file:

bcp
"SELECT [SalesOrderID], [OrderDate], [ProductID], [OrderQty] FROM [AdventureWorks2012].[Sales].[vSalesSummary] ORDER BY [SalesOrderID], [ProductID]"
queryout SalesOrders.dat
-T -S(local) -c

One could easily define a similar process where a file is generated for each month, with the latest month being updated on a nightly basis. One could then easily ensure only orders for the past X months are including in the metric. The only important aspect in how the data is generated is that the output must be ordered on the sales order identifier. As you will see later, this grouping is necessary to allow the co-occurrence data to be derived.

A sample output from the Adventure Works sales data is as follows, with the fields being tab delimited:

43659    2005-07-01    776    1
43659    2005-07-01    777    3
43659    2005-07-01    778    1
43660    2005-07-01    758    1
43660    2005-07-01    762    1
43661    2005-07-01    708    5
43661    2005-07-01    711    2
43661    2005-07-01    712    4
43661    2005-07-01    715    4
43661    2005-07-01    716    2
43661    2005-07-01    741    2
43661    2005-07-01    742    2
43661    2005-07-01    743    1
43661    2005-07-01    745    1
43662    2005-07-01    730    2
43662    2005-07-01    732    1
43662    2005-07-01    733    1
43662    2005-07-01    738    1

Once the data has been exported building the matrix becomes a matter of counting the co-occurrence of products associated with each sales order.

### Building the Matrix

The problem of building a co-occurrence matrix is simply one of counting. The basic process is as follows:

1. Read the file and group each sales order along with the corresponding list of product identifiers
2. For each sales order product listing, calculate all the corresponding pairs of products
3. Maintain/Update the running total of each pair of products found
4. At the end of the file, output a sparse matrix based on the running totals of product pairs

To support parallelism steps 1-3 are run in parallel where the output of these steps will be a collection of the product pairs with the co-occurrence count. These collections are then combined to create a single sparse matrix.

In performing this processing it is important that the reading of the file data is such that it can efficiently create a sequence of products for each sales order identifier; the grouping key. If you have been following my Hadoop Streaming blog entries one will see that this is the same process undertaken in processing data within a reducer step.

The matrix building code, in full, is as follows:

1. module MatrixBuilder =
2.
3.     // Defines the structure being input
4.     type OrderHeader = { OrderId:int; OrderDate:DateTime}
5.     type OrderDetail = { OrderId:int; OrderDate:DateTime; ProductId:int; OrderQty:int}
6.
7.     // Configuration values
8.     let qtyMaximum = 10.0                           // Maximum rating between 2 items
9.     let entryThreshold = 10.0                       // Minim correlations for matrix inclusion
10.
11.     let recentFactor = 3.0                          // Quantity increase factor for recent items
12.     let baseDate = DateTime.Today.AddMonths(-3)     // Date for a recent item
13.
14.     let matrixSize = 10000*500                      // Products*Correlations for Hast Table init
15.
16.
17.     // Gets the base data for building the sparse matrix
18.     let private getMatrixData filename =
19.
20.         let minItem = ref Int32.MaxValue
21.         let maxItem = ref 0
22.         let rcTable = Dictionary<(int*int), float>(matrixSize)
23.
24.         // Parses an input line of format OrderID/Date/ProductID/Quantity
25.         let inline parseLine (input:string) =
26.             if not (input = null) then
27.                 let values = input.Split('\t')
28.                 Some({OrderDetail.OrderId = Int32.Parse(values.[0].Trim());
29.                     OrderDate = DateTime.Parse(values.[1].Trim());
30.                     ProductId = Int32.Parse(values.[2].Trim());
31.                     OrderQty = Int32.Parse(values.[3].Trim())})
32.             else
33.                 None
34.
35.         // Define file reader properties
38.
39.         // calculates the pairs for an order
40.         let rec pairs l = seq {
41.             match l with
42.             | h::t -> for e in t do yield h, e
43.                       yield! pairs t
44.             | _ -> ()
45.             }
46.
47.         // Add a factor based on order properties - example recent orders
49.             if DateTime.Compare(order.OrderDate, baseDate) > 0 then
50.                 recentFactor
51.             else
52.                 1.0
53.
54.         // Adds to the table
55.         let addRowColumn idx1 idx2 qty =
56.             minItem := min (min idx1 idx2) !minItem
57.             maxItem := max (max idx1 idx2) !maxItem
58.             if rcTable.ContainsKey (idx1, idx2) then
59.                 rcTable.[(idx1, idx2)] <- rcTable.[(idx1, idx2)] + qty
60.             else
61.                 rcTable.[(idx1, idx2)] <- qty
62.             ()
63.
64.         // The main processor for the sequence for an order
66.             let orderList = Seq.toList orders
67.             // if order only has one product then no correlation can be determined
68.             if (orderList.Length > 1) then
69.                 pairs orderList
70.                 |> Seq.iter (fun (order1, order2) ->
71.                     let qty = (min (float (order1.OrderQty * order2.OrderQty)) qtyMaximum) * (orderFactor header)
74.
75.         // Creates a sequence of the input based on the provided orderId
76.         let lastInput = ref None
77.         let continueDo = ref false
78.         let inputsByKey key firstValue = seq {
79.             // Yield any value from previous read
80.             yield firstValue
81.
82.             continueDo := true
83.             while !continueDo do
84.                 match getInput() with
85.                 | Some(orderDetail) when orderDetail.OrderId = key ->
86.                     // Yield found value and remainder of sequence
87.                     yield orderDetail
88.                 | Some(orderDetail) ->
89.                     // Have a value but different key
90.                     lastInput := Some(orderDetail)
91.                     continueDo := false
92.                 | None ->
93.                     // Have no more entries
94.                     lastInput := None
95.                     continueDo := false
96.         }
97.
98.         // Controls the calling of the matrix maker
99.         let rec processInput (input:OrderDetail option) =
100.             match input with
101.             | Some(orderDetail) ->
103.                 processOrder header (inputsByKey orderDetail.OrderId orderDetail)
104.                 processInput lastInput.contents
105.             | _ -> ()
106.
107.         // Build the matrix/table from the input data
108.         processInput (getInput())
109.
110.         // return the table defintion along with the size
111.         (!minItem, !maxItem, rcTable)
112.
113.
114.     // Build a Sparse matrix from the file data
115.     let GetMatrixParallel (filenames:string array) =
116.
117.         // In Parallel gets the RC tables
118.         let results =
119.             filenames
120.             |> Array.Parallel.map (fun filename -> getMatrixData filename)
121.
122.         // define the max sparse array size
123.         let (minItem, maxItem) =
124.             results
125.             |> Array.fold (fun acc (idxMin, idxMax, _) ->
126.                 (min idxMin (fst acc), max idxMax (snd acc))) (0, 0)
127.
128.         let offset = minItem
129.         let size = maxItem + 1 - minItem
130.
131.         // convert to a sparse matrix and return
132.         let items = seq {
133.             for (_, _, rcTable) in results do
134.                 for item in rcTable do
135.                     if item.Value > entryThreshold then
136.                         yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
137.             }
138.
139.         offset, (SparseMatrix.ofSeq size size items)
140.
141.     // Interface for a single file
142.     let GetMatrix (filename:string) =
143.
144.         let (minItem, maxItem, rcTable) = getMatrixData filename
145.
146.         let offset = minItem
147.         let size = maxItem + 1 - minItem
148.
149.         // convert to a sparse matrix and return
150.         let items = seq {
151.             for item in rcTable do
152.                 if item.Value > entryThreshold then
153.                     yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
154.             }
155.
156.         offset, SparseMatrix.ofSeq size size items

In this instance the file data is processed such that order data is grouped and exposed as an OrderHeader type, along with a collection of OrderDetail types. From this the pairs of products are easily calculated; using the pairs function.

To maintain a co-occurrence count for each product pair a Dictionary is used. The key for the Dictionary is a tuple of the pair of products, with the value being the co-occurrence count. The rationale for using a Dictionary is that it supports O(1) lookups; whereas maintaining an Array will incur an O(n) lookup.

The final decision to be made is of what quantity should be added into the running total. One could use a value of 1 for all co-occurrences, but other options are available. The first is the order quantity. If one has a co-occurrence for products quantities x and y, I have defined the product quantity as x.y. The rationale for this is that having product quantities is logically equivalent to having multiple product lines in the same order. If this was the case then the co-occurrence counting would arrive at the same value. I have also limited the maximum quantity amount such that small products ordered in large volumes do not skew the numbers; such as restricting a rating from 1-10.

One optional quantity factor I have included is one based on the order header. The sample code applies a quantity scaling factor for recent orders. Again the rationale for this is such that recent orders have a greater affect on the co-occurrence values over older ones. All these scaling factors are optional and should be configured to give your desired results.

As mentioned, to achieve parallelism in the matrix building code the collection of input files can be processed in parallel with each parallel step independently outing its collection of product pairs and co-occurrence counts. This is achieved using an Array.Parallel.map function call. This maps each input file into the Dictionary for the specified file. Once all the Dictionary elements have been created they are then used to create a sequence of elements to create the sparse matrix.

One other critical element returned from defining the Dictionary is the maximum product identifier. It is assumed that the product identifier is an integer which is used as an index into the matrix; for both row and column values. Thus the maximum value is needed to define row and columns dimensions for the sparse matrix.

Whereas this approach works well for products ranges from 1 onwards, what if product identifiers are defined from a large integer base, such as 100,000, or are Guid’s. In the former case one has the option of calculating an offset such that the index for the matrix is the product identifier minus the starting offset; this being the approach I have taken. The other option is that the exported data is mapped such that the product numbers are defined from 1 onwards; again this can be a simple offset calculation. In the latter case for Guid’s, one would need to do a mapping from the Guid keys to a sequential integer.

### Querying for Item Similarity

So once the sparse matrix has been built how does one make recommendations. There are basically two options, either recommendations based on a single product selection, or recommendations for multiple products, say based on the contents of a shopping basket.

The basic process for performing the recommendations, in either case, is as follows:

1. Select the sparse row vectors that correspond to the products for which a recommendation is required
2. Place the row values into a PriorityQueue where the key is the co-occurrence count and the value the product identifier
3. Dequeue and return, as a sequence, the required number of recommendations from the PriorityQueue

For the case of a single product the recommendations are just a case of selecting the correct single vector and returning the products with the highest co-occurrence count. The process for a selection of products is almost the same, except that multiple vectors are added into the PriorityQueue. One just needs to ensure that products that are already in the selection on which the recommendation is being made are excluded from the returned values. This is easily achieved with a HashSet lookup.

So the full code that wraps building the recommendation is as follows:

1. type MatrixQuery (filenames:string array) =
2.
3.     let (offset, coMatrix) =
4.         match filenames with
5.         | [||] -> invalidArg "filename" "Filename cannot be an empty Array"
6.         | [| filename |] -> MatrixBuilder.GetMatrix(filename)
7.         | _ -> MatrixBuilder.GetMatrixParallel(filenames)
8.
9.     let getQueueSM (products:int array) =
10.         // Define the priority queue and lookup table
11.         let queue = PriorityQueue(coMatrix.ColumnCount)
12.         let lookup = HashSet(products)
13.
14.         // Add the items into a priority queue
15.         products
16.         |> Array.iter (fun item ->
17.             let itemIdx = item - offset
18.             if itemIdx >= 0 && itemIdx < coMatrix.ColumnCount then
19.                 seq {
20.                     for idx = 0 to (coMatrix.ColumnCount - 1) do
21.                         if not (lookup.Contains(idx)) then
22.                             yield KeyValuePair(coMatrix.[itemIdx, idx], idx + offset)
23.                 }
24.                 |> queue.Merge)
25.         // Return the queue
26.         queue
27.
28.     let getItems (queue:PriorityQueue<float, int>) (items:int) =
29.         let toDequeue = min items queue.Count
30.         seq { for i in 1 .. toDequeue do yield queue.Dequeue().Value }
31.
32.     new(filename:string) =
33.         MatrixQuery([| filename |])
34.
35.
36.     member self.GetRecommendations(product:int, items:int) =
37.         let queue = getQueueSM([| product |])
38.         getItems queue items
39.
40.     member self.GetRecommendations(products:int array, items:int) =
41.         let queue = getQueueSM(products)
42.         getItems queue items
43.
44.     member self.GetRecommendations(product:int) =
45.         self.GetRecommendations(product, 10)
46.
47.     member self.GetRecommendations(products:int array) =
48.         self.GetRecommendations(products, 10)

Using the code is a simply a matter of creating the MatrixQuery type, with the files to load, and then calling the GetRecommendations() operator for the required products (shopping basket):

let filenames = [|
@"C:\DataExport\SalesOrders201203.dat";
@"C:\DataExport\SalesOrders201204.dat";
@"C:\DataExport\SalesOrders201205.dat";
@"C:\DataExport\SalesOrders201206.dat" |]

let itemQuery = MatrixQuery(filenames)
let products = itemQuery.GetRecommendations([| 860; 870; 873 |], 25)

In extracting the row vector associated with the required product one could just have used coMatrix.Row(item); but this creates a copy of the vector. To avoid this the code just does an enumeration of the required matrix row. Internally the sparse vector maintains three separate arrays for the column and row offsets and sparse value. Using these internal arrays and the associated element based operations would speed up obtaining recommendations; but currently these properties are not exposed. If one uses the sparse matrix from the F# power Pack then one can operate in this fashion using the following code:

1. let getQueueSM (products:int array) =
2.     // Define the priority queue and lookup table
3.     let queue = PriorityQueue(coMatrix.NumCols)
4.     let lookup = HashSet(products)
5.
6.     // Add the items into a priority queue
7.     products
8.     |> Array.iter (fun item ->
9.         let last = coMatrix.InternalSparseRowOffsets.Length - 1
10.         if item >= 0 && item <= last then
11.             let (startI, endI) =
12.                 if item = last then (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item])
13.                 else (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item + 1] - 1)
14.             seq {
15.                 for idx = startI to endI do
16.                     if not (lookup.Contains(idx)) then
17.                         let product = coMatrix.InternalSparseColumnValues.[idx]
18.                         yield KeyValuePair(-coMatrix.InternalSparseValues.[idx], product)
19.             }
20.             |> queue.Merge
21.         )
22.     // Return the queue
23.     queue

In considering a shopping basket I have assumed that each product has been selected only once. You could have the situation where one wanted to take into consideration the quantity of each product selected. In this instance one would take the approach of performing a scalar multiplication of the corresponding product vectors. What this would achieve is, for recommendations, prioritizing products for which a user has purchased multiple items.

Although this code does a lot of element-wise operations, as mentioned earlier, one can think of the operations in pure matrix terms. In this case one would just multiply the sparse matrix by a sparse column vector representing the items from which to make the recommendations.

Consider the previous sparse matrix example, and say one had a basket consisting of 2 products 1002 and 1 product 1004:

 Product ID 1001 1002 1003 1004 1005 Product ID Qty Product ID Rec 1001 1 3 1 1 1001 1001 3 1002 1 3 1002 2 1002 1003 3 3 4 4 × 1003 = 1003 10 1004 1 4 2 1004 1 1004 1005 1 4 2 1005 1005 2

In this case it should be easy to see the recommendation for products should be 1003, 1001, and lastly 1005.

### Conclusion

The approach mentioned here will probably work well for the cases where one has 100,000’s products with similarities between 1000’s of items, with 10’s millions of orders to be considered. For most situations this should suffice. However, if you are not in this bracket, in my next post, I will show how this approach can be mapped over to Hadoop and MapReduce, allowing for even greater scale.

Written by Carl Nolan