Azure storage analytics is a great way to know insights about your azure storage service usage.

If you have not been using it, here are the details about what it provides. To summarize, it provides two kind of analytics

            - Metrics: Provide hourly summary for bandwidth usage, errors, average client time,server time etc. and daily summary of blob storage capacity.

            - Logs: Provide traces of executed requests for your storage accounts and these are stored in blob storage.

Enabling them doesn’t reduce storage account target limits in any way, but you need to pay for the transactions and capacity consumed by these logs. Metrics provides most of the data about storage usage
but sometimes you need to know granular usage statistics like

  1. Bandwidth/s/blob
  2. Bandwidth/s/storage account/
  3. Transactions/s/blob
  4. Transactions/s/storage-account
  5. Hot blobs with respect to transactions, bandwidth etc.
  6. IP address of clients accessing storage account the most.
  7. Etc.

This requires processing analytics logs and then finding answers for these questions. If you are a heavy consumer of storage account, you could be generating 100s GBs and 100s M transactions a day. This makes it
ideal candidate to be processed using Hadoop.

Lets see how we can get these statistics using HadoopOnAzure.

At a very high level, here is what i will be doing to achieve this. Copying the relevant data from azure blob storage to HDFS using a map only MapReduce job,  then importing the data to HIVE table and then running HiveQL queries to get these statistics. Note that, if you want to get just one answer, you can directly run against the blob storage without copying data to HDFS in a single pig script for ex., but in case of multiple jobs, if data is already on HDFS it runs much faster because of data locality.

In Hadoop, code goes close to the data instead of traditional way of code fetching data close to itself. MapReduce engine tries to assign workloads to the servers where the data to be processed is stored to avoid the network overhead. This is known as data locality.

Lets get into more details now.

       1. Setup azure blob storage account to be processed. Detailed steps are avaiable here.

        2. Write a Map-only MapReduce job to pre-process the data in required format and copy it to HDFS.

    Here is what my MapReduce script looks like:

StorageAnalytics.js

var myMap = function (key, value, context) {
var ipAddressIndex = 15;
var blobURLIndex = 11;
var requestHeaderIndex = 17;
var requestPacketIndex = 18;
var responseHeaderIndex = 19;
var responsePacketIndex = 20;

var quotedwords = value.split('"');
 
        // encode semicolon character.    
        for(i = 1; i < quotedwords.length; i+=2)
{
quotedwords[i] = quotedwords[i].replace(/;/g, "%59");
}
var wordsstr = quotedwords.join('');
 //Get URL after removing all parameters.
var words = wordsstr.split(";");
var url = words[blobURLIndex];
var index = url.indexOf("?");
if (index != -1) {
url = url.slice(0, index);
}
 //Get data and time strings 
        var datestr = words[1];
var seckey = datestr.substring(datestr.indexOf('T') + 1, datestr.lastIndexOf('.'));
var daykey = datestr.substring(0, datestr.indexOf('T'));
 //Get IP address after removing port.
var ipaddress = words[ipAddressIndex].substring(0, words[ipAddressIndex].indexOf(':'));

var reqheader = words[requestHeaderIndex];
if(reqheader == "") {reqheader = "0";}

var reqpacket = words[requestPacketIndex];
if(reqpacket == "") {reqpacket = "0";}

var resheader = words[responseHeaderIndex];
if(resheader == "") {resheader = "0";}

var respacket = words[responsePacketIndex];
if(respacket == "") {respacket = "0";}

context.write(daykey + '\t' + seckey + '\t' + url + '\t' + reqheader + '\t' + reqpacket + '\t' + resheader + '\t' + respacket + '\t' + ipaddress, 1);
};

var myReduce = function (key, values, context) {
var sum = 0;
while(values.hasNext()) {
sum += parseInt(values.next());
}
context.write(key, sum);
};

var main = function (factory) {
var job = factory.createJob("myAnalytics", "myMap", "myReduce");
 //Make it Map only task by setting no of reduces to zero.
job.setNumReduceTasks(0);
job.submit();
job.waitForCompletion(true);
};

Few important things to note from the script;

i. Encoding semicolon(';') character which comes in blob url or other entries by using that fact that whenever it comes in an entity that entity will be quoted. Note that ';' is used as a delimiter in log entries ans hence needs to be quoted when it is inside the entity.

ii. Picking up relevant data from analytics log entry( url, date, time, request/response header/data and ip address), but can take everything that entry provides. 

iii. Making it Map only MapReduce job since aggregation/summarization will be done using hive queries on the generated data.

 

  3. Go to javascript console and run the above job.

js> runJs("StorageAnalytics.js", "asv://$logs/blob/2012/06/", "output");

Note that i am processing data for the month of June, you can change the path above to 'asv://$logs/blob/' to process complete data. Once the job completes, it stores generated data in output folder in HDFS in part-m-0000.. files.

js> #ls output
Found 111 items
-rw-r--r-- 3 suren supergroup 0 2012-06-25 01:43 /user/suren/output/_SUCCESS
drwxr-xr-x - suren supergroup 0 2012-06-25 01:42 /user/suren/output/_logs
-rw-r--r-- 3 suren supergroup 20816 2012-06-25 01:42 /user/suren/output/part-m-00000
-rw-r--r-- 3 suren supergroup 19631 2012-06-25 01:42 /user/suren/output/part-m-00001
  ...... 

  Delete _logs and _SUCCESS files so that all files in the output folder conform to the same schema.

js> #rmr output/_logs
Deleted hdfs://.../output/_logs

js> #rmr output/_SUCCESS
Deleted hdfs://... /output/_SUCCESS

  4. Create a HIVE table for this schema. Go to interactive HIVE console and create this table

create table StorageAnalyticsStats(DayStr STRING, TimeStr STRING, urlStr STRING, ReqHSize BIGINT, ReqPSize BIGINT, ResHSize BIGINT, ResPSize BIGINT, IpAddress STRING, count BIGINT) PARTITIONED BY (account STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\n';

Note that the table is partitioned by account, so that you can insert multiple storage accounts data and then run HiveQL queries once on all accounts. Partition in hive is a logical column which is derived by file name instead of data from within the file. You can read more about partitions and other data unit concepts on hive wiki  here.

 5. Insert data to the table created above.

LOAD DATA INPATH 'output' INTO TABLE StorageAnalyticsStats PARTITION(account='account1');

You can repeat step 1 to 5 to load data from multiple storage accounts and then run the queries. Since we are not overwriting data while loading it will retain the data for all storage accounts. Since each storage account data goes in different partition, you can run queries scoped to a single storage account or across all storage account. After this step your data from output folder is moved inside hive, and when you drop table this data will eventually be deleted. If you want to retain the data after dropping the tables, you can use external tables. You can read more about external tables and loading data at hive wiki here.

Now we have Hive table with complete data. Not that hive doesn't process data while loading, actual processing will happen when you run your queries on this data. that's why load data command executes pretty quickly. Lets get the required statistics now: 

1. Find top 10 blobs with maximum bandwidth usage /second.

select daystr, timestr, urlstr, sum(reqhsize+reqpsize+reshsize+respsize) as bandwidth, sum (reqhsize+reqpsize) as Ingress, sum(reshsize+respsize) as Egress from storageanalyticsstats group by daystr, timestr, urlstr order by bandwidth desc limit 10;

2. Find top 10 blobs with respect to transactions/s 

select daystr, timestr, urlstr, count(*) as txns from storageanalyticsstats group by daystr, timestr, urlstr order by txns desc limit 10;

3. Find IP address of the client accessing storage service most in the given time.

select IpAddress, count(*) as hits from storageanalyticsstats group by IpAddress order by hits desc limit 10;

4. Find top 10 transactions/second/storageaccount

select daystr, timestr, count(*) as txns from storageanalyticsstats group by daystr, timestr order by txns desc limit 10;