PixieDust Support of Streaming Data

PixieDust Support of Streaming Data


With the rise of IoT devices (Internet of Things), being able to analyze and visualize live streams of data is becoming more and more important. For example, you could have sensors like thermometers in machines or portable medical devices like pacemakers, continuously streaming data to a streaming service like Kafka. PixieDust makes it easier to work with live data inside Jupyter Notebooks by providing simple integration APIs to both the PixieApp and display() framework.
 
On the visualization level, PixieDust uses Bokeh support for efficient data source update to plot streaming data into live charts (note that at the moment, only line chart and scatter plot are supported, but more will be added in the future). The display() framework also supports geospatial visualization of streaming data using the Mapbox rendering engine.
 
To activate streaming visualizations, you need to use a class that inherits from StreamingDataAdapter which is an abstract class that is part of the PixieDust API. This class acts as a generic bridge between the streaming data source and the visualization framework.
Note: I recommend spending time looking at the code for StreamingDataAdapter here.

 
The following diagram shows how the StreamingDataAdapter data structure fits into the display() framework:

StreamingDataAdapter architecture

When implementing a subclass of StreamingDataAdapter, you must override the doGetNextData() method provided by the base class, which will be called repeatedly to fetch new data to update the visualization. You can also optionally override the getMetadata() method to pass context to the rendering engine (we’ll use this method later to configure the Mapbox rendering).
 
The abstract implementation [i] of doGetNextData() looks like this:
 
@abstractmethod
def doGetNextData(self):
    “””Return the next batch of data from the underlying stream.
     Accepted return values are:

(x,y): tuple of list/numpy arrays representing the x and y axis
pandas dataframe
y: list/numpy array representing the y axis. In this case, the x axis is automatically created
pandas serie: similar to #3
json
Geojson
url with supported payload (json/geojson)

     “””
     Pass
 
The preceding docstring explains the different types of data that is allowed to be returned from doGetNextData().
 
As an example, we want to visualize the location of a fictitious drone wandering around the earth on a map and in real-time. Its current location is provided by a REST service here.
 
The payload is using Geojson [ii], for example:
 
{
    “geometry”: {
        “type”: “Point”,
        “coordinates”: [
            -93.824908715741202, 10.875051131034805
    ]
     },
     “type”: “Feature”,
    “properties”: {}
}
 
To render our drone location in real time, we create a DroneStreamingAdapter class that inherits from StreamingDataAdapter and simply return the drone location service URL in the doGetNextData() method [iii] as shown in the following code:
 
from pixiedust.display.streaming import *
 
class DroneStreamingAdapter(StreamingDataAdapter):
     def getMetadata(self):
    iconImage = “rocket-15”
    return {
            “layout”: {“icon-image”: iconImage, “icon-size”: 1.5},
            “type”: “symbol”
    }
     def doGetNextData(self):
    return “https://wanderdrone.appspot.com/”
adapter = DroneStreamingAdapter()
display(adapter)
 
In the getMetadata() method, we return the Mapbox specific style properties (as documented here which uses a rocket Maki icon as a symbol for the drone).
 
With a few lines of code, we were able to create a real-time geospatial visualization of a drone location, with the following results:

Real-time geospatial mapping of a drone

Note: You can find the complete Notebook for this example in the PixieDust repo at this location.

Adding streaming capabilities to your PixieApp
In the next example, we show how to visualize streaming data coming from an Apache Kafka data source, using the MessageHubStreamingApp PixieApp provided out of the box by PixieDust.
Note: MessageHubStreamingApp works with the IBM Cloud Kafka service called Message Hub, but it can easily be adapted to any other Kafka service.
This PixieApp lets the user choose a Kafka topic associated with a service instance and display the events in real-time. Assuming that the events payload from the selected topic uses a JSON format, it presents a schema inferred from sampling the events data. The user can then choose a particular field (must be numerical) and a real-time chart showing the average of the values for this field over time is displayed.

Real-time visualization of streaming data

The key PixieApp attribute needed to provide streaming capabilities is pd_refresh_rate which executes a particular kernel request at specified intervals (pull model). In the preceding application, we use it to update the real-time chart as shown in the following HTML fragment returned by the showChart route [iv]:
 
    @route(topic=”*”,streampreview=”*”,schemaX=”*”)
     def showChart(self, schemaX):
    self.schemaX = schemaX
        self.avgChannelData = self.streamingData.getStreamingChannel(self.computeAverages)
    return “””
<div class=”well” style=”text-align:center”>
     <div style=”font-size:x-large”>Real-time chart for {{this.schemaX}}(average).</div>
</div>
 
<div pd_refresh_rate=”1000″ pd_entity=”avgChannelData”></div>
        “””
The preceding div is bound to the avgChannelData entity via the pd_entity attribute and is responsible for creating the real-time chart which is updated every second (pd_refresh_rate=1000 ms). In turn, the avgChannelData entity is created via a call to getStreamingChannel() which is passed to the self. The computeAverage function is responsible for updating the average value for all the data being streamed. It is important to note that avgChannelData is a class that inherits from StreamingDataAdapter and therefore can be passed to the display() framework for building real-time charts.
 
The last piece of the puzzle is for the PixieApp to return a displayHandler needed by the display() framework. This is done by overriding the newDisplayHandler() method [v] as follows:
 
def newDisplayHandler(self, options, entity):
     if self.streamingDisplay is None:
        self.streamingDisplay = LineChartStreamingDisplay(options, entity)
     else:
       self.streamingDisplay.options = options
     return self.streamingDisplay
 
In the preceding code, we use it to create an instance of LineChartStreamingDisplay provided by PixieDust in the pixiedust.display.streaming.bokeh package (https://github.com/ibm-watson-data-lab/pixiedust/blob/master/pixiedust/display/streaming/bokeh/lineChartStreamingDisplay.py), passing the avgChannelData entity.
 
If you want to see this application in action, you need to create a Message Hub service instance on IBM Cloud and using its credentials, invoke this PixieApp in a Notebook with the following code:
 
from pixiedust.apps.messageHub import *
MessageHubStreamingApp().run(
     credentials={
        “username”: “XXXX”,
        “password”: “XXXX”,
        “api_key” : “XXXX”,
        “prod”: True
     }
)
 
Summary
We’ve covered the details on how to create your own custom visualization using the PixieDust display() API.
 

You enjoyed an excerpt from Packt Publishing’s latest book “Thoughtful Data Science”, written by David Taieb. If you want to bridge the gap between developer and data scientist by creating a modern open-source, Python-based toolset that works with Jupyter Notebook, and PixieDust; this is the book for you.

Link: PixieDust Support of Streaming Data