Microsoft Codename "Cloud Numerics"

Model and analyze data at scale

“Cloud Numerics” Example: Using the IParallelReader Interface

“Cloud Numerics” Example: Using the IParallelReader Interface

Rate This
  • Comments 0

To get your big-data applications to work, you need to read in the data –but what if your data is too big? What if the dataset you are reading requires more memory than is available on any single machine? This is a common constraint for all disciplines that handle large datasets. With “Cloud Numerics” you can read data, in parallel, across several machines such as several Windows Azure cluster nodes.

This post walks you through the “Cloud Numerics” IParallelReader interface, which is part of an input framework that facilitates the parallel loading of distributed arrays from storage.

The IParallelReader interface supports the following general workflow:

  1. The master process determines the partitioning of work for all of the worker processes based on some information, provided by the calling routine, about the dataset that needs to be loaded.
  2. Next, the master process and each worker process loads a portion of the distributed array based on the partitioning scheme you pre-determined. Each process returns a Local.NumericDenseArray.
  3. Finally, the Local.NumericDenseArrays created in the previous step are assembled to form the Distributed.DenseArray that is the output of the parallel reader.
Note!
The IParallelReader interface leverages the distributed runtime. For more details, please see this post.

 

This data input framework takes care of all communication and assembly, which means you only need to supply the relevant source code for the first two steps of the workflow and then define how the Distributed.NumericDenseArray is assembled. This required information is provided by an instance of a class that implements the IParallelReader<T> interface. It must contain the following:

  object [] ComputeAssignment(int totalLocales)
    This function is run only at the master process and it determines the partitioning of work across the other worker processes. It must return an array (let’s say the array is called “result”) of size totalLocales --where result[i] comprises the instructions that will be sent to rank i.

Note!

result must be an array of a serializable type.
  Numerics.Local.NumericDenseArray<T> ReadWorker(object assignment)
    This function runs at all ‘locales’ using the instructions provided by the ComputeAssignment call; rank i is called with result[i] as argument. This function performs the work of loading a portion of the stored object and returns a local NumericsDenseArray.
  An integer property called DistributedDimension
    DistributedDimension informs the framework how to assemble the local pieces into a final distributed array. For DistributedDimension = j the Local.NumericDenseArrays are assembled along the jth dimension to construct the Distributed.NumericDenseArray.

 

Once the required class is defined, it can be used by creating a new instance and calling the LoadData<T> method. For example:

var reader = new CustomReader(<args>); 
var result = Microsoft.Numerics.Distributed.IO.Loader.LoadData<T>(reader);

Note!
Information that is common to all the ranks does not need to be placed in the result of ComputeAssignment. It can be stored in your IParallelReader<T> class; the class instance passed to LoadData is sent to all the processes by the framework. This happens before ComputeAssignment is called and so changes made by it to the instance are only seen by the master process.

 

We’ve provided a sample CSV file reader that implements the interface described above. The IParallelReader<T> class contains fields that hold the name of the file and other common information such as its length. ComputeAssignment uses the length information to create an array of (start, end) offset pairs that serve as instructions for the ranks. The ReadWorker method then has the file name (common to all the ranks) and its own (start, end) offset pair. With this information each rank opens the file, reads lines from the starting to the ending offset and returns the local array. Note that we have to be a little careful here for the cases where the starting or ending offset lands in the middle of a line. Finally, the resulting distributed array is created by the framework using the DistributedDimension property. The sample CSV file reader can be downloaded from the “Cloud Numerics” page on the Microsoft Connect site. To request access to the site, navigate to this link: sign-up here. Once you are registered for the Microsoft codename “Cloud Numerics” lab, you can download the software with this link: download here.

Here is an example that reads a set of files:

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.Numerics.Distributed;
using Microsoft.Numerics.Distributed.IO;
using Microsoft.Numerics.Local;

public class SampleReader : IParallelReader<double>
{
private string[] fileNamesArray = null; // Caching the list of file names
private int distDim = 0; // Concatenate across the rows

// Simple constructor – just store the file list
public SampleReader(string[] inList)
{
fileNamesArray = inList;
}

// The required property for the distributed dimension
public int DistributedDimension
{
get { return distDim; }
set { distDim = value; }
}

// Parcel out sets of files to the ranks
public object[] ComputeAssignment(int totalRanks)
{
int numFiles = fileNamesArray.Length;

// How many files to give to each rank?
int chunkSize = Math.Max(1, numFiles / totalRanks);

string[][] result = new string[totalRanks][];

// Go through the list of files and allocate chunkSize pieces
int loc = 0;
for (int i = 0; i < totalRanks; i++)
{
if (loc < numFiles)
{
// There are still files to process
int num = 0;
if (i == totalRanks - 1)
{
// The last guy gets the rest
num = numFiles - loc;
}
else
{
// Take as many as I can
num = Math.Min(chunkSize, numFiles - loc);
}

// Copy the names to the result array
result[i] = new string[num];
for (int j = 0; j < num; j++)
{
result[i][j] = fileNamesArray[loc];
loc++;
}
}
else
{
result[i] = new string[0];
}
}
return (object[])result;
}

// Read your set of files to determine your contribution to the final result
public Microsoft.Numerics.Local.NumericDenseArray<double> ReadWorker(object arg)
{
string[] myFiles = (string[])arg;

if (myFiles.Length == 0)
{
return null;
}

// Read my files, accumulating them into result
Microsoft.Numerics.Local.NumericDenseArray<double> myResult = null;

for (int i = 0; i < myFiles.Length; i++)
{
// Have to write ReadSingleFile yourself
var fileResult = ReadSingleFile(myFiles[i]);

// Have to write the combine function yourself for now
// We will soon have this function available
myResult = Concatenate(myResult, fileResult, 0);
}

// Return my local piece and the framework will stitch them up
return myResult;
}

// Combine two matrices by row
private static Microsoft.Numerics.Local.NumericDenseArray<double> Concatenate(Microsoft.Numerics.Local.NumericDenseArray<double> mat1, Microsoft.Numerics.Local.NumericDenseArray<double> mat2, int dim)
{
// Ensure distributed dimension is 0
if (dim != 0)
{
throw new ArgumentException("dim must be 0");
}

// If you pass in null, then just return the other argument
if (mat1 as object == null)
{
return mat2;
}

if (mat2 as object == null)
{
return mat1;
}

int i, j;
var shape1 = mat1.Shape.ToArray();
var shape2 = mat2.Shape.ToArray();

if (shape1.Length != 2)
{
throw new ArgumentException("mat1 must be two dimensional");
}

if (shape2.Length != 2)
{
throw new ArgumentException("mat2 must be two dimensional");
}

if (shape1[1] != shape2[1])
{
throw new ArgumentException("Both inputs must have the same number of columns");
}

var result = Microsoft.Numerics.Local.NumericDenseArrayFactory.Zeros<double>(shape1[0] + shape2[0], shape1[1]);

// Copy the first
for (i = 0; i < shape1[0]; i++)
{
for (j = 0; j < shape1[1]; j++)
{
result[i, j] = mat1[i, j];
}
}

// Copy the second below the first
for (i = 0; i < shape2[0]; i++)
{
for (j = 0; j < shape2[1]; j++)
{
result[shape1[0] + i, j] = mat2[i, j];
}
}

return result;
}

private static Microsoft.Numerics.Local.NumericDenseArray<double> ReadSingleFile(string fileName)
{
// Here you read our file into a local array
...
}
}

Blog - Comment List MSDN TechNet
  • Loading...
Leave a Comment
  • Please add 4 and 5 and type the answer here:
  • Post