Cumulocity : une plateforme IoT taillée pour l'industrie

Désormais c'est un fait, c'est bien l'industrie qui va guider les prochaines évolutions majeures de l'IoT, pousser l'adoption de certaines normes et standardiser les principaux cas d'usages.

Il est donc temps de commencer à prendre en main les plateformes spécialisées, délivrant des services professionnels : device management, intégration de protocoles, sécurité, mise à l'échelle, analytics, multitenants...

Pour cela, nous allons utiliser la plateforme Cumulocity.

Son nom ne vous dit peut-être rien, mais elle fait partie des leaders du marché, avec déjà de belles références. Pour en avoir comparé bon nombre, je la place grande première de mon top 3 des plateformes, juste devant IBM Watson IoT et theThings.io (je n'oublie bien sûr pas Thingworx de PTC, mais elle est à mon sens beaucoup trop complexe à mettre en oeuvre, et reste difficile d'accès aux néophytes du PLM...).

Prise en main en 5 minutes

Pour la tester, rien de plus simple, il suffit de créer un compte d'essai gratuit, qui donne accès à la majeure partie des fonctionnalités pendant 1 mois. Ensuite, il est possible d'ajouter par exemple un smartphone en tant que Device, via l'appli Android spécifique. Vous aurez alors accès à un dashboard affichant en temps réel les données relevées par votre téléphone (gyroscope, accéléromètre, luxmètre...).

Explorer toute l'API REST

Puisqu'une plateforme IoT n'est rien sans une interface REST, il est assez simple d'explorer toute l'étendue des possibilités offertes par celle de Cumulocity. Pour cela, le plus pratique est d'utiliser l'extension Chrome Postman et récupérer la librairie publiée par Cumulocity. Vous aurez ainsi sous la main toutes les requêtes prêtes à être utilisées pour envoyer des données, créer des Devices, déclencher des alarmes, pousser des Opérations...

Premier cas concret

Maintenant que nous avons fait les premiers tests de la plateforme, il est temps de créer un vrai cas d'usage du monde réel. Voici le scénario envisagé :

Des Devices émettent des données vers la plateforme (quel que soit le protocole) qui sont reçues en temps réel sur des serveurs ou des applications d'analyse. Très commun! Cependant, selon les services offerts par les API REST, il n'est pas toujours trivial de mettre en place un mécanisme de réception en temps réel de la dernière donnée émise ou alarme levée par un Device spécifique.

Cumulocity offre de ce point de vue 2 outils intéressants :

  • des connexions par websockets, qui permettent de créer des flux de données en temps réel, plus robustes pour ce besoin que du simple HTTP (même en long-polling)
  • un ensemble de flux d'évènements internes au framework, sur lesquels il est possible de s'abonner, pour recevoir des notifications sur TOUT ce qui se passe sur la plateforme (réception de données, création de Device, déclenchement d'évènements/alarmes...)

Pour établir une connexion, il suffit de respecter le protocole de Bayeux, basé les concepts de publish/subsribe et d'échanges de données en JSON. La documentation de Cumulocity à ce sujet permet de rapidement mettre en place ces flux temps réel.

Afin de réaliser une implémentation la plus simple possible et la plus polyvalente, utilisable tant par un serveur web qu'embarqué dans un Device, nous allons utiliser le langage Python.

Gestion des websockets

Notre premier module s'occupera de la gestion bas niveau des websockets, grâce au package websocket-client. Vous aurez également besoin de simplejson pour créer du contenu JSON.


import websocket
import simplejson

class WebsocketWrapper:
    def __init__(self, cnxUrl):
        self.ws = websocket.WebSocketApp(cnxUrl,
                              on_message = self.on_message,
                              on_error = self.on_error,
                              on_close = self.on_close)
        self.ws.on_open = self.on_open
        self.servermsgjson = []
        
    def on_error(self, ws, error):
        print(error)
    
    def on_close(self, ws):
        print('closed')
        
    def on_open(self, ws):
        print('connected')
        self.afterOpen()
        
    def on_message(self, ws, message):
        print("server msg: "+message)
        self.servermsgjson = simplejson.loads(message)
        self.parseMsg(self.servermsgjson)
        
    def send(self,msg):
        self.ws.send(msg)
        
    def openWebsocket(self):
        print('opening websocket...')
        # websocket.enableTrace(True)
        self.ws.run_forever(http_proxy_host='...', http_proxy_port=80)
        
    def afterOpen(self):
        pass
    
    def parseMsg(self, jsonMsg):
        pass
    
    def handshake(self):
        self.ws.send(simplejson.dumps([{'channel':'/meta/handshake','ext':{'com.cumulocity.authn':{'token':'...'}},'version':'1.0','mininumVersion':'1.0beta','supportedConnectionTypes':['websocket','long-polling','callback-polling'],'advice':{'timeout':120000,'interval':30000}}]))
        
    def connect(self, clientId):
        print('realtime connection...')
        self.ws.send(simplejson.dumps([{"channel":"/meta/connect","connectionType":"websocket","clientId":clientId}]))
        

Il vous faudra ici modifier l'adresse du proxy (ou supprimer les paramètres s'il n'y a pas de proxy) et ajouter le token d'authentification (usr:pwd encodés en Base64).

Clients websockets

Le code ci-dessous va permettre de créer 2 clients en websockets écoutant des flux différents : gestion des évènements et réception de mesures.


import simplejson

from websocket.websocket_wrapper import WebsocketWrapper

class C8YClient:
    def __init__(self):
        self.clientId = ''
        self.findClientId = False
        self.readyToConnect = False
        
        self.wrapper = WebsocketWrapper('wss://.cumulocity.com/cep/realtime')
        self.wrapper.parseMsg = self.parseMsg
        self.wrapper.afterOpen = self.afterOpen
        
    def init(self):
        self.wrapper.openWebsocket()
    
    def afterOpen(self):
        self.initClient()
        
    def initClient(self):
        self.findClientId = True
        self.wrapper.handshake()

    def parseMsg(self, jsonMsg):
        if self.findClientId:
            self.findClientId = False
            self.doFindClientId(jsonMsg)
            self.doSubscription()
        elif self.readyToConnect:
            self.readyToConnect = False
            self.connectClient()
        else:
            self.useMsg(jsonMsg)
            
    def useMsg(self, jsonMsg):
        pass
            
    def doSubscription(self):
        pass
        
    def doFindClientId(self, jsonMsg):
        self.clientId = jsonMsg[0]['clientId']
        print('got clientId : ' + self.clientId)
        
    def connectClient(self):
        self.wrapper.connect(self.clientId)
        
    def subscribe(self, channel):
        print('subscribing to '+channel+'...')
        self.readyToConnect = True
        self.wrapper.send(simplejson.dumps([{"channel":"/meta/subscribe","subscription":"/"+channel+"/*","clientId":self.clientId}]))
        
        
class MeasurementsClient(C8YClient):
    def __init__(self):
        super().__init__()
        
    def initClient(self):
        print('initMeasurementsClient...')
        super().initClient()
        
    def doSubscription(self):
        super().subscribe('measurements')
            
    def useMsg(self, jsonMsg):
        print('new measurement : ' + simplejson.dumps(jsonMsg))


class EventsClient(C8YClient):
    def __init__(self):
        super().__init__()
        
    def initClient(self):
        print('initEventsClient...')
        super().initClient()
        
    def doSubscription(self):
        super().subscribe('events')
            
    def useMsg(self, jsonMsg):
        print('new event : ' + simplejson.dumps(jsonMsg))

Ne reste plus qu'un petit main pour exécuter tout cela :


from threading import Thread
from websocket.c8y_client import MeasurementsClient, EventsClient  

if __name__ == '__main__':
    
    measurementsClient = MeasurementsClient()
    threadM = Thread(target = measurementsClient.init, args = ())
    threadM.start()
    
    eventsClient = EventsClient()
    threadE = Thread(target = eventsClient.init, args = ())
    threadE.start()

Nous disposons maintenant d'une simple application Python qui reçoit en temps réel des notifications d'évènements de Cumulocity! Vous pouvez le tester simplement via les requêtes REST déjà existantes dans la librairie Postman. Ainsi, POSTer un nouveau Measurement (relevé de données issu d'un capteur) déclenchera immédiatement sa réception côté Python.

A vous l'IIoT !


Fichier(s) joint(s) :

0 commentaires: