imageimage

 Ce billet est rédigé par Romain Casteres et Jérôme Christ (pour la section § Récupération des Tweets), tous deux consultants de la société DCube. Il documente les démonstrations réalisées lors de la session #BigData dans le cadre des Journées SQL Server 2012.

Je vous en souhaite une bonne lecture.

--Philippe


Cet article traite des processus pour capturer, transformer, enrichir et visualiser des données issues du réseau social Twitter depuis Windows Azure, Hadoop (Windows Azure HD Insight), SSIS (SQL Server Integration Services), SSAS (SQL Server Analysis Services), et Excel. Il aborde pour cela différents modes d’analyses.

Dans une première partie, nous avons illustré l’analyse des 5 dernières secondes via Stream Insight. Nous nous intéressons dans cette seconde partie à l’analyse avec 0 préparation mais 13 h avant consultation des résultats. Dans cette partie, Jérôme vous montrera comment récupérer des Tweets en relation avec le voyage à partir d’un Worker Role dans Windows Azure pour les sauvegarder dans une Blob Storage Azure (espace de stockage dans le Cloud).

Dans une troisième et dernière partie, nous aborderons l’analyse décisionnelle avec 6 mois de préparation, mais 1 seconde d’attente avant consultation des tableaux de bord. Je vous montrerai dans ce contexte les dernières nouveautés de Microsoft en terme de restitution.

image

Récupération des Tweets

Nous allons télécharger dans un Blob Storage Azure (ASV) des Tweets en relations avec le voyage :

image

Téléchargement du SDK Azure FOR .NET

Pour développer votre Worker rôle dans Visual Studio, vous allez avoir besoin des Templates de projets qui se trouvent dans le SDK de Windows Azure pour .NET : http://www.windowsazure.com/en-us/develop/net/

image

Création d’une application Twitter

Nous allons avoir besoin de créer une application Twitter sur le portail Développeur afin de disposer de jetons OAuth qui nous permettront de consommer les API Twitter.

Nous utiliserons pour cette démo la Streaming API qui permet de récupérer à partir d’un Topic (#montopic) ou d’un mot-clé (monmotcle) les nouveaux Tweets automatiquement.

Il faut dans un premier temps se rendre sur le portail Développeur de Twitter : https://dev.twitter.com/apps

image

On crée une application Twitter que l’on va appeler DemoBigDataWorkerRole.

image

image

Dans le champ WebSite, vous pouvez renseigner par exemple l’adresse de votre Worker Role.

Nous n’aurons pas besoin de callback.

Allez dans le détail de votre application fraichement créée et créez votre Access Token en cliquant sur Create my access token en bas de page :

image

Notez ces informations dans un coin; nous allons en avoir besoin pour authentifier nos appels à l’API dans le Worker role.

Création du Worker Role

Créez votre Worker Role dans Visual Studio à partir du Template de projet Windows Azure Cloud Service.

image

Puis choisissez Rôle de travail.

image

Nous allons donc commencer par nous connecter à la Streaming API afin de vérifier que notre application Twitter est bien créée et les jetons valides.

Pour cela, je vais utiliser la librairie Twitterizer (disponible ici https://github.com/Twitterizer/Twitterizer, ne l’installez pas via NuGet il manque des assemblages). Ce qui va particulièrement nous intéresser, c’est l’assemblage Twitterizer2.Streaming qui, comme sa dénomination le suppose, permet de consommer la Streaming API de Twitter.

Ajoutez-donc ces 2 DLLs :

image

Prenez les valeurs des jetons (tokens) de votre application Twitter et renseignez-les :

image

Renseignez les topics/keywords que vous souhaitez surveiller :

image

Démarrez votre TwitterStream en passant en paramètres vos jetons et topics :

image

Enfin ajoutez les callback dont vous avez besoin :

image

Ce qui nous intéresse particulièrement ici est le callback RawJson qui envoie en paramètre les Tweets sous format JSON que l’on va sauvegarder par chunks de 5 Mo dans le stockage.

image

Pour se connecter au compte de stockage, nous allons le renseigner comme chaîne de connexion dans les paramètres du Worker Role :

image

Installez le package WindowsAzure.Storage via NuGet pour accéder à la librairie client du stockage :

image

Récupérez votre chaîne de connexion dans votre Worker Role :

image

Créez un CloudBlobClient pour récupérer une référence à votre conteneur :

image

On va stocker les blocks dans le conteneur suivant :

image

Pour uploader les blobs, on va d’abord récupérer une référence à un block dont on passe le nom en paramètre. S’il n’existe pas dans le conteneur, il sera créé lors de l’upload.

image

Déploiement du Worker Role

Nous allons déployer notre solution sur l’environnement de staging (recette) :

Faites un clic droit sur le projet de déploiement et Publier :

image

Importez votre fichier .publishsettings et renseignez vos paramètres de déploiement :

image

image

Une fois le déploiement effectué avec succès, nous allons vérifier que les Tweets sont bien stockés dans le stockage. Pour cela, on va utiliser l’Azure Storage Explorer : http://azurestorageexplorer.codeplex.com/

image

Création et configuration d’un cluster Hadoop

Le service Hadoop sur Windows Azure (Windows Azure HDInsight) est disponible uniquement sur invitation CTP (Community Technical Preview). La CTP permet aux développeurs de tester le service et de remonter à Microsoft des éventuelles erreurs. Demandez une invitation à cette adresse : https://connect.microsoft.com/SQLServer/Survey/Survey.aspx?SurveyID=13697

Une fois l’invitation reçue, connectez-vous et créez un nouveau cluster :

image

image

image

Dans la tuile Manage Cluster, configurer l’ASV :

image

image

 

Copie des Tweets dans le cluster Hadoop

Dans le portail HDInsight cliquez sur la tuile Remote Desktop et connectez-vous au cluster, ouvrez une fenêtre de commande et exécutez les scripts suivants :

Suppression du dossier « socialtravel » :

hadoop fs -rmr /socialtravel

Création du dossier « socialtravel » :

hadoop fs -mkdir /socialtravel

Importation des Tweets : (une semaine de Tweets) :

start hadoop distcp asv://socialtravel/2012-10-30 /socialtravel/2012-10-30

start hadoop distcp asv://socialtravel/2012-10-31 /socialtravel/2012-10-31

start hadoop distcp asv://socialtravel/2012-11-1 /socialtravel/2012-11-01

start hadoop distcp asv://socialtravel/2012-11-2 /socialtravel/2012-11-02

start hadoop distcp asv://socialtravel/2012-11-3 /socialtravel/2012-11-03

start hadoop distcp asv://socialtravel/2012-11-4 /socialtravel/2012-11-04

start hadoop distcp asv://socialtravel/2012-11-5 /socialtravel/2012-11-05

Remarque : Le fait d’utiliser la commande « Start » permet d’exécuter en parallèle les taches DISTCP.

Création d’une table Hive externe

Nous allons créer des vues (tables externes) sur les dossiers contenant les Tweets. Dans le portail HDInsight, cliquez sur la tuile Interactive Console, puis dans l’onglet Hive, exécutez les scripts suivants :

set hive.exec.dynamic.partition=true; // Car les données de la colonnes ne sont connues que lors de l’exécution

set hive.exec.dynamic.partition.mode=nonstrict;        // Car c’est une partition dynamique

create external table IF NOT EXISTS raw_tweets ( json_response string ) partitioned by (dat string) stored as textfile;

alter table raw_tweets add if not exists partition(dat='2012-10-30') location '/socialtravel/2012-10-30';

alter table raw_tweets add if not exists partition(dat='2012-10-31') location '/socialtravel/2012-10-31';

alter table raw_tweets add if not exists partition(dat='2012-11-01') location '/socialtravel/2012-11-01';

alter table raw_tweets add if not exists partition(dat='2012-11-02') location '/socialtravel/2012-11-02';

alter table raw_tweets add if not exists partition(dat='2012-11-03') location '/socialtravel/2012-11-03';

alter table raw_tweets add if not exists partition(dat='2012-11-04') location '/socialtravel/2012-11-04';

alter table raw_tweets add if not exists partition(dat='2012-11-05') location '/socialtravel/2012-11-05';

Remarques :

  • Les tables externes référencent des données qui sont en dehors du répertoire d’entrepôt. Si la table externe est supprimée, les métadonnées de la table seront supprimées mais pas les données.
  • Les tables « normal » référencent les données qui sont contenues dans son entrepôt, les données y sont chargées. Si la table est supprimée, les métadonnées de la table et ses données seront effacées.

Hive ne vérifie pas si l'emplacement des données de la table externe existe ou non au moment de sa création.

Création d’une table Hive

Toujours dans la console Hive exécuter les scripts suivants :

Script 1

drop table tweets;

create table tweets

(

        id string,

        created_at string,

        created_at_date string,

        created_at_year string,

        created_at_month string,

        created_at_day string,

        created_at_time string,

        in_reply_to_user_id_str string,

        text string,

        contributors string,

        is_a_retweet boolean,

        truncated string,

        coordinates string,

        source string,

        retweet_count int,

        url string,

        first_hashtag string,

        first_user_mention string,

        id_user string,

        screen_name string,

        name string,

        followers_count int,

        listed_count int,

        friends_count int,

        lang string,

        user_location string,

        time_zone string,

        profile_image_url string,

        hashtags array<string>,

        user_mentions array<string>

)

partitioned by (partition_key string);

Script 2

set hive.exec.dynamic.partition=true;

set hive.exec.dynamic.partition.mode=nonstrict;

INSERT overwrite TABLE tweets

PARTITION (partition_key)

SELECT 

        get_json_object(json_response, '$.id_str') AS id,

        get_json_object(json_response, '$.created_at') AS created_at,

        CONCAT(SUBSTR (get_json_object(json_response, '$.created_at'),1,10),' ', SUBSTR  (get_json_object(json_response, '$.created_at'),27,4)) AS created_at_date,

        SUBSTR (get_json_object(json_response, '$.created_at'),27,4) AS created_at_year,

        CASE SUBSTR (get_json_object(json_response, '$.created_at'),5,3)

                WHEN 'Jan' THEN '01'

                WHEN 'Feb' THEN '02'

                WHEN 'Mar' THEN '03'

                WHEN 'Apr' THEN '04'

                WHEN 'May' THEN '05'

                WHEN 'Jun' THEN '06'

                WHEN 'Jul' THEN '07'

                WHEN 'Aug' THEN '08'

                WHEN 'Sep' THEN '09'

                WHEN 'Oct' THEN '10'

                WHEN 'Nov' THEN '11'

                WHEN 'Dec' THEN '12' END AS created_at_month,

        SUBSTR (get_json_object(json_response, '$.created_at'),9,2) AS created_at_day,

        SUBSTR (get_json_object(json_response, '$.created_at'),12,8) AS created_at_time,

        get_json_object(json_response, '$.in_reply_to_user_id_str') AS in_reply_to_user_id_str,

        get_json_object(json_response, '$.text') AS text,

        get_json_object(json_response, '$.contributors') AS contributors,

        (CAST (get_json_object(json_response, '$.retweet_count') AS int) != 0) AS is_a_retweet,

        get_json_object(json_response, '$.truncated') AS truncated,

        get_json_object(json_response, '$.coordinates') AS coordinates,

        get_json_object(json_response, '$.source') AS source,

        CAST (get_json_object(json_response, '$.retweet_count') AS int) AS retweet_count,

        get_json_object(json_response, '$.entities.display_url') AS url,

        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))) AS first_hashtag,

        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))) AS  first_user_mention,

        get_json_object(json_response, '$.user.id') AS id_user,

        get_json_object(json_response, '$.user.screen_name') AS screen_name,

        get_json_object(json_response, '$.user.name') AS name,

        CAST (get_json_object(json_response, '$.user.followers_count') AS int) AS followers_count,

        CAST (get_json_object(json_response, '$.user.listed_count') AS int) AS listed_count,

        CAST (get_json_object(json_response, '$.user.friENDs_count') AS int) AS friends_count,

        get_json_object(json_response, '$.user.lang') AS lang,

        get_json_object(json_response, '$.user.location') AS user_location,

        get_json_object(json_response, '$.user.time_zone') AS time_zone,

        get_json_object(json_response, '$.user.profile_image_url') AS profile_image_url,

        array( 

                trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[5].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[6].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[7].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[8].text'))),

                trim(lower(get_json_object(json_response, '$.entities.hashtags[9].text')))) AS hashtags,

        array(

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[5].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[6].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[7].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[8].screen_name'))),

                trim(lower(get_json_object(json_response, '$.entities.user_mentions[9].screen_name')))) AS user_mentions,

        dat AS partition_key

FROM raw_tweets

WHERE (length(json_response)> 500) AND coalesce(get_json_object(json_response, '$.id_str'), 'X') RLIKE '^[0-9]+$' AND SUBSTR(coalesce(get_json_object(json_response, '$.id_str'), 'X'),1,1)='2';

Dans la requête d’insertion, j’ai typé, découpé et formaté les données, vous trouverez toutes les spécificités du langage Hive à cette adresse : https://cwiki.apache.org/Hive/languagemanual.html

Remarque : Vous pouvez copier sur le cluster Hadoop un fichier de Script HiveQL contenant la liste des requêtes et l’exécuter à partir du dossier Bin de Hive avec la commande suivante :

hive -i C:\apps\myHive.sql

Installation et configuration de la source Hive ODBC

Dans le portail HDInsight, cliquez sur la tuile Downloads et téléchargez / installez le driver Hive ODBC en fonction de votre environnement (version 32 bits ou 64 bits).

Remarque : Pour la présentation, j’ai utilisé la version "1.0.0.0". Vous pouvez la télécharger aux adresses suivantes :

Dans la tuile Open Ports, ouvrez le port 10000 : Cela permettra à Excel de se connecter aux différentes tables Hive.

image

Toujours pour se connecter aux tables Hive, nous allons configurer une source de données de type Hive ODBC. Dans le menu Démarrer\Outils d’administration\Sources de données (ODBC), ajoutez puis configurez la source en spécifiant l’utilisateur que vous avez renseigné lors de la création du cluster Hadoop.

image image

Exécution d’une requête Hive dans Excel

Dans Excel, vous devriez avoir l’icône Hive Pane :

  • Choisissez la source Hive ODBC préalablement créée
  • Configurez avec l’assistant ou écrivez directement votre requite

image

Ainsi l’utilisateur, dans un environnement qui lui est familier, est capable d’interroger avec toutes les fonctionnalités disponibles dans Excel, les données provenant d’un Framework Big Data :)

Remarque : Il est aussi possible de récupérer nos données non pas dans une feuille Excel, mais dans un cube Tabular via PowerPivot (toujours dans Excel) afin de les croiser avec d’autres données issues d’autres systèmes d’information, mais nous verrons cela dans le prochain chapitre : Analyse ultra rapide, mais qui aura exigé 6 mois de préparation.

Ceci clôt cette seconde partie de l’article. Dans une troisième et dernière partie, je vous montrerai comment les intégrer dans un système d'information décisionnelle pour les analyser à travers un cube multidimensionnel tabulaire... À suivre donc :)