To get your big-data applications to work, you need to read in the data –but what if your data is too big? What if the dataset you are reading requires more memory than is available on any single machine? This is a common constraint for all disciplines that handle large datasets. With “Cloud Numerics” you can read data, in parallel, across several machines such as several Windows Azure cluster nodes.
This post walks you through the “Cloud Numerics” IParallelReader interface, which is part of an input framework that facilitates the parallel loading of distributed arrays from storage.
The IParallelReader interface supports the following general workflow:
This data input framework takes care of all communication and assembly, which means you only need to supply the relevant source code for the first two steps of the workflow and then define how the Distributed.NumericDenseArray is assembled. This required information is provided by an instance of a class that implements the IParallelReader<T> interface. It must contain the following:
Once the required class is defined, it can be used by creating a new instance and calling the LoadData<T> method. For example:
var reader = new CustomReader(<args>); var result = Microsoft.Numerics.Distributed.IO.Loader.LoadData<T>(reader);
We’ve provided a sample CSV file reader that implements the interface described above. The IParallelReader<T> class contains fields that hold the name of the file and other common information such as its length. ComputeAssignment uses the length information to create an array of (start, end) offset pairs that serve as instructions for the ranks. The ReadWorker method then has the file name (common to all the ranks) and its own (start, end) offset pair. With this information each rank opens the file, reads lines from the starting to the ending offset and returns the local array. Note that we have to be a little careful here for the cases where the starting or ending offset lands in the middle of a line. Finally, the resulting distributed array is created by the framework using the DistributedDimension property. The sample CSV file reader can be downloaded from the “Cloud Numerics” page on the Microsoft Connect site. To request access to the site, navigate to this link: sign-up here. Once you are registered for the Microsoft codename “Cloud Numerics” lab, you can download the software with this link: download here.
Here is an example that reads a set of files:
using System;using System.Collections.Generic;using System.Diagnostics.CodeAnalysis;using System.IO;using System.Linq;using System.Text;using Microsoft.Numerics.Distributed;using Microsoft.Numerics.Distributed.IO;using Microsoft.Numerics.Local;public class SampleReader : IParallelReader<double>{ private string[] fileNamesArray = null; // Caching the list of file names private int distDim = 0; // Concatenate across the rows // Simple constructor – just store the file list public SampleReader(string[] inList) { fileNamesArray = inList; } // The required property for the distributed dimension public int DistributedDimension { get { return distDim; } set { distDim = value; } } // Parcel out sets of files to the ranks public object[] ComputeAssignment(int totalRanks) { int numFiles = fileNamesArray.Length; // How many files to give to each rank? int chunkSize = Math.Max(1, numFiles / totalRanks); string[][] result = new string[totalRanks][]; // Go through the list of files and allocate chunkSize pieces int loc = 0; for (int i = 0; i < totalRanks; i++) { if (loc < numFiles) { // There are still files to process int num = 0; if (i == totalRanks - 1) { // The last guy gets the rest num = numFiles - loc; } else { // Take as many as I can num = Math.Min(chunkSize, numFiles - loc); } // Copy the names to the result array result[i] = new string[num]; for (int j = 0; j < num; j++) { result[i][j] = fileNamesArray[loc]; loc++; } } else { result[i] = new string[0]; } } return (object[])result; } // Read your set of files to determine your contribution to the final result public Microsoft.Numerics.Local.NumericDenseArray<double> ReadWorker(object arg) { string[] myFiles = (string[])arg; if (myFiles.Length == 0) { return null; } // Read my files, accumulating them into result Microsoft.Numerics.Local.NumericDenseArray<double> myResult = null; for (int i = 0; i < myFiles.Length; i++) { // Have to write ReadSingleFile yourself var fileResult = ReadSingleFile(myFiles[i]); // Have to write the combine function yourself for now // We will soon have this function available myResult = Concatenate(myResult, fileResult, 0); } // Return my local piece and the framework will stitch them up return myResult; } // Combine two matrices by row private static Microsoft.Numerics.Local.NumericDenseArray<double> Concatenate(Microsoft.Numerics.Local.NumericDenseArray<double> mat1, Microsoft.Numerics.Local.NumericDenseArray<double> mat2, int dim) { // Ensure distributed dimension is 0 if (dim != 0) { throw new ArgumentException("dim must be 0"); } // If you pass in null, then just return the other argument if (mat1 as object == null) { return mat2; } if (mat2 as object == null) { return mat1; } int i, j; var shape1 = mat1.Shape.ToArray(); var shape2 = mat2.Shape.ToArray(); if (shape1.Length != 2) { throw new ArgumentException("mat1 must be two dimensional"); } if (shape2.Length != 2) { throw new ArgumentException("mat2 must be two dimensional"); } if (shape1[1] != shape2[1]) { throw new ArgumentException("Both inputs must have the same number of columns"); } var result = Microsoft.Numerics.Local.NumericDenseArrayFactory.Zeros<double>(shape1[0] + shape2[0], shape1[1]); // Copy the first for (i = 0; i < shape1[0]; i++) { for (j = 0; j < shape1[1]; j++) { result[i, j] = mat1[i, j]; } } // Copy the second below the first for (i = 0; i < shape2[0]; i++) { for (j = 0; j < shape2[1]; j++) { result[shape1[0] + i, j] = mat2[i, j]; } } return result; } private static Microsoft.Numerics.Local.NumericDenseArray<double> ReadSingleFile(string fileName) { // Here you read our file into a local array ... }}