Share via


Consuming Azure Stream Analytics output in Azure Logic Apps

Stream Analytics

Azure Stream Analytics lets users perform real-time analytics for their Internet of Things (IoT) solutions. In this article, we will use Event Hubs as a source and Service Bus Queue as a destination for Stream Analytics. Azure Event Hubs is a hyper-scale telemetry ingestion service that can store millions of events. And, Azure Service Bus is a highly-reliable cloud messaging service with Queues offering a First In, First Out (FIFO) message delivery to competing consumers.

Event Hubs (source)

Create an Event Hubs namespace in Azure Portal.

create_eventhubs

Then add a new Event Hub into the namespace.

create_eventhub

Service Bus Queue (destination)

Create a Service Bus namespace in Azure Portal.

create_sbnamespace

Then add a new queue into the namespace.

create_sbqueue

Stream Analytics job

Create a new Azure Stream Analytics job that processes messages coming through Event Hub and stores the output into Service Bus queue. For this post, we are using a pass-through Stream Analytics.

create_streamanalyticsjob

Service Bus Queue Reader Logic App

The Service Bus Queue Reader Logic App reads a message from a Service Bus queue, uses Azure Functions to remove special characters that get added by Azure Stream Analytics and then sends it to a child Logic App for processing the message. Click here to download complete Logic App definition.

Service Bus Queue Trigger

Add a new Service Bus queue trigger to read messages from a Service Bus queue and trigger the Logic App.

servicebus_receive_trigger

Special characters handling using Azure Functions

Azure Stream Analytics adds certain special (Unicode) characters in output messages which cause failures in Logic Apps when trying to decode them (e.g. to JSON format). As a result, we need to pre-process the message using Azure Functions and remove those special characters before going ahead with regular message processing in Logic Apps.

Create a new C# Webhook Function App (let’s call it ParseStreamAnalyticsJSONContent) in Azure Portal and insert below code snippet into it. Create the Function App in same region as Logic Apps for being callable from Logic Apps. It is recommended to create it in same Resource Group as Logic Apps. Click here to download the source code.

azurefunctions

Then add a Function App action in Logic Apps and choose ParseStreamAnalyticsJSONContent function app that was created in previous step.

parsejson_functionapp_action

Call Message Processor child Logic App

Call the Message Processor child Logic App that has logic for processing these messages, details of which are covered in next section.

call_childworkflow_action

Message Processor Logic App

The Message Processor Logic App takes an event data in JSON format and send an e-mail using Office 365 Connector. Click here to download complete Logic App definition.

Request Trigger

Create a request trigger and provide it JSON schema for incoming message. JSON schema helps tokenize message properties which come handy in subsequent steps.

request_trigger

Office365 Send Email

Send an email using Office 365 Connector. You can use message properties coming from the request trigger to craft the email template (subject, body etc.).

send_office365_email

Response action

Add a response action to complete this Logic App. Having a Response action is a MUST to be able to call this Logic App from another Logic App.

response_action