Arduino et infrarouge

Aujourd'hui, un article rapide sous forme de mémo, pour mettre en place une communication série entre deux Arduinos en infrarouge. Ceci pour permettre de trouver rapidement le matériel et code nécessaires.

Avant tout, il faut savoir que dès lors qu'une communication série filaire (USB par ex) a été mise en place, il suffit de "remplacer" le fil par une liaison optique infrarouge, puisque l'encodage reste le même (UART pour de la communication série).

Code et montage

Puisqu'il est inutile de réinventer la roue, voici un très bon tutoriel de Zola lab expliquant quel montage et quel code utiliser pour mettre en place la liaison optique :

Matériel

Et pour se fournir le bon matériel pour réaliser ce montage, rendez-vous chez Lextronic :

Have fun!


Fichier(s) joint(s) :



Tester une API REST avec Swagger, Postman et Jenkins

Avec l'essor de l'IoT vient l'âge d'or des APIs (et de REST), pour ouvrir facilement et de manière universelle des services.

La gestion des APIs devient donc une affaire de pros et doit être de plus en plus rigoureuse. Nous allons voir comment maîtriser les évolutions d'une API à partir d'outils spécialisés.

Les basiques

Je pense qu'il n'est plus nécessaire de présenter Postman (extension Chrome pour tester une API) ni Jenkins (serveur d'intégration continue). Mais il est intéressant de voir comment les deux peuvent travailler ensemble pour devenir une suite de tests automatisée d'API, via l'utilitaire en ligne de commande Newman et cette très bonne documentation officielle.

Swagger, la documentation simple et efficace!

Le meilleur moyen de s'assurer de la cohérence d'une API reste une bonne documentation. Pour ce faire, il existe Swagger (et sa version en ligne SwaggerHub) qui est l'outil de référence en la matière. En effet, il est très simple d'écrire rapidement une documentation pertinente, avec qui plus est une présentation agréable :

Putting it all together

Maintenant, voyons comment combiner tout cela.

Avec Swagger, vous avez la possibilité de définir, en plus de vos URLs, le formalisme des objets métier que vous manipulez. Pour cela il faut utiliser la section "definitions". Par exemple :


paths:
...
definitions:
  Asset :
    type: object
    additionalProperties: false
    required: ['name','owner','type']
    properties :
      name: 
        type: string
        description: Nom de l'Asset.
      owner:
        type: string
        description: Propriétaire du Device (User / domaine métier)
      type:
        type: string
        description: Type de l'objet
  Zone :
    type: object
    additionalProperties: false
    required: ['name','geojson']
    properties :
      name: 
        type: string
        description: Nom de la Zone.
      geojson :
        type: object
        description: Géométrie au format GeoJSON
        properties:
          type:
            type: string
            description: Feature
          geometry:
            type: object
            properties:
              type:
                type: string
                description: Point
              coordinates:
                type: array
                description: tableau de points GPS
                items:
                  type: number
      country:
        type: string
        description: Pays de la Zone, issu du référentiel Country.

Parallèlement, il est possible dans les tests Postman de vérifier que le contenu d'une réponse correspond à un schéma particulier (via TinyValidator). Et surprise, le formalisme des "schémas" utilisés par Postman correspond parfaitement à celui de nos "definitions" Swagger! Cela vous donne des idées?? Alors allons-y!

Tout d'abord, nous allons récupérer notre définition Swagger en l'exportant au format JSON pour pouvoir l'utiliser avec Postman. Cependant, à l'heure actuelle, il n'est pas possible dans Postman de charger un fichier externe, autre qu'un fichier de données de tests. Donc nous allons devoir faire un copié/collé de notre définition dans Postman. Mais pour essayer de faire cela proprement, nous allons coller notre Swagger dans une variable d'environnement de Postman, afin qu'elle soit accessible dans tous les tests :

Ensuite, nous allons écrire un test de vérification de schéma qui va se baser sur la définition d'un de nos objets métier Swagger :

var swaggerJson = JSON.parse(environment.Swagger);// test schema var jsonData = JSON.parse(responseBody);var schema = {type:array,items: swaggerJson.definitions.Asset};tests[Valid Data] = tv4.validate(jsonData, schema);if(tv4.error) {   console.log(tv4.error);}

Comme vous le voyez, nous chargeons en JSON notre définition Swagger, puis définissons un schéma de type tableau d'Assets, en récupérant la définition des Assets avec swaggerJson.definitions.Asset. Et en plus c'est facile!

Maintenant, lors de l'exécution de votre requête, les tests seront lancés et la réponse sera validée en fonction du schéma indiqué! Il est important à ce stade de disposer d'une définition d'objet stricte, notamment via l'utilisation des propriétés Swagger additionalProperties:false et required: ['name','owner','type'], sans quoi n'importe quelle réponse passera la validation.

Vous pouvez même aller jusqu'à utiliser votre définition Swagger pour récupérer les URLs à tester. Pour cela, il faut utiliser la partie "Pre-request Script" de Postman et parcourir de la même façon notre Swagger :

Ici, nous définissons une variable d'environnement 'url' à l'aide des attributs 'host' et 'basePath' de Swagger. Cette variable est ensuite utilisée par Postman via la syntaxe {{url}} dans sa barre d'adresse.

Remarque :

Il est également possible d'importer directement la définition Swagger dans Postman (via le bouton Import) pour récupérer automatiquement toutes les URLs à tester. Cependant, si vous avez besoin de réimporter votre définition suite à des modifications, cela va écraser tout ce qui existe déjà, y compris les tests!.

Pour terminer, il ne vous reste plus qu'à mettre tout cela dans Jenkins, via l'export de votre collection et de l'environnement Postman que vous passerez en paramètres de Newman! Enjoy!


Fichier(s) joint(s) :



Cumulocity : une plateforme IoT taillée pour l'industrie

Désormais c'est un fait, c'est bien l'industrie qui va guider les prochaines évolutions majeures de l'IoT, pousser l'adoption de certaines normes et standardiser les principaux cas d'usages.

Il est donc temps de commencer à prendre en main les plateformes spécialisées, délivrant des services professionnels : device management, intégration de protocoles, sécurité, mise à l'échelle, analytics, multitenants...

Pour cela, nous allons utiliser la plateforme Cumulocity.

Son nom ne vous dit peut-être rien, mais elle fait partie des leaders du marché, avec déjà de belles références. Pour en avoir comparé bon nombre, je la place grande première de mon top 3 des plateformes, juste devant IBM Watson IoT et theThings.io (je n'oublie bien sûr pas Thingworx de PTC, mais elle est à mon sens beaucoup trop complexe à mettre en oeuvre, et reste difficile d'accès aux néophytes du PLM...).

Prise en main en 5 minutes

Pour la tester, rien de plus simple, il suffit de créer un compte d'essai gratuit, qui donne accès à la majeure partie des fonctionnalités pendant 1 mois. Ensuite, il est possible d'ajouter par exemple un smartphone en tant que Device, via l'appli Android spécifique. Vous aurez alors accès à un dashboard affichant en temps réel les données relevées par votre téléphone (gyroscope, accéléromètre, luxmètre...).

Explorer toute l'API REST

Puisqu'une plateforme IoT n'est rien sans une interface REST, il est assez simple d'explorer toute l'étendue des possibilités offertes par celle de Cumulocity. Pour cela, le plus pratique est d'utiliser l'extension Chrome Postman et récupérer la librairie publiée par Cumulocity. Vous aurez ainsi sous la main toutes les requêtes prêtes à être utilisées pour envoyer des données, créer des Devices, déclencher des alarmes, pousser des Opérations...

Premier cas concret

Maintenant que nous avons fait les premiers tests de la plateforme, il est temps de créer un vrai cas d'usage du monde réel. Voici le scénario envisagé :

Des Devices émettent des données vers la plateforme (quel que soit le protocole) qui sont reçues en temps réel sur des serveurs ou des applications d'analyse. Très commun! Cependant, selon les services offerts par les API REST, il n'est pas toujours trivial de mettre en place un mécanisme de réception en temps réel de la dernière donnée émise ou alarme levée par un Device spécifique.

Cumulocity offre de ce point de vue 2 outils intéressants :

  • des connexions par websockets, qui permettent de créer des flux de données en temps réel, plus robustes pour ce besoin que du simple HTTP (même en long-polling)
  • un ensemble de flux d'évènements internes au framework, sur lesquels il est possible de s'abonner, pour recevoir des notifications sur TOUT ce qui se passe sur la plateforme (réception de données, création de Device, déclenchement d'évènements/alarmes...)

Pour établir une connexion, il suffit de respecter le protocole de Bayeux, basé les concepts de publish/subsribe et d'échanges de données en JSON. La documentation de Cumulocity à ce sujet permet de rapidement mettre en place ces flux temps réel.

Afin de réaliser une implémentation la plus simple possible et la plus polyvalente, utilisable tant par un serveur web qu'embarqué dans un Device, nous allons utiliser le langage Python.

Gestion des websockets

Notre premier module s'occupera de la gestion bas niveau des websockets, grâce au package websocket-client. Vous aurez également besoin de simplejson pour créer du contenu JSON.


import websocket
import simplejson

class WebsocketWrapper:
    def __init__(self, cnxUrl):
        self.ws = websocket.WebSocketApp(cnxUrl,
                              on_message = self.on_message,
                              on_error = self.on_error,
                              on_close = self.on_close)
        self.ws.on_open = self.on_open
        self.servermsgjson = []
        
    def on_error(self, ws, error):
        print(error)
    
    def on_close(self, ws):
        print('closed')
        
    def on_open(self, ws):
        print('connected')
        self.afterOpen()
        
    def on_message(self, ws, message):
        print("server msg: "+message)
        self.servermsgjson = simplejson.loads(message)
        self.parseMsg(self.servermsgjson)
        
    def send(self,msg):
        self.ws.send(msg)
        
    def openWebsocket(self):
        print('opening websocket...')
        # websocket.enableTrace(True)
        self.ws.run_forever(http_proxy_host='...', http_proxy_port=80)
        
    def afterOpen(self):
        pass
    
    def parseMsg(self, jsonMsg):
        pass
    
    def handshake(self):
        self.ws.send(simplejson.dumps([{'channel':'/meta/handshake','ext':{'com.cumulocity.authn':{'token':'...'}},'version':'1.0','mininumVersion':'1.0beta','supportedConnectionTypes':['websocket','long-polling','callback-polling'],'advice':{'timeout':120000,'interval':30000}}]))
        
    def connect(self, clientId):
        print('realtime connection...')
        self.ws.send(simplejson.dumps([{"channel":"/meta/connect","connectionType":"websocket","clientId":clientId}]))
        

Il vous faudra ici modifier l'adresse du proxy (ou supprimer les paramètres s'il n'y a pas de proxy) et ajouter le token d'authentification (usr:pwd encodés en Base64).

Clients websockets

Le code ci-dessous va permettre de créer 2 clients en websockets écoutant des flux différents : gestion des évènements et réception de mesures.


import simplejson

from websocket.websocket_wrapper import WebsocketWrapper

class C8YClient:
    def __init__(self):
        self.clientId = ''
        self.findClientId = False
        self.readyToConnect = False
        
        self.wrapper = WebsocketWrapper('wss://.cumulocity.com/cep/realtime')
        self.wrapper.parseMsg = self.parseMsg
        self.wrapper.afterOpen = self.afterOpen
        
    def init(self):
        self.wrapper.openWebsocket()
    
    def afterOpen(self):
        self.initClient()
        
    def initClient(self):
        self.findClientId = True
        self.wrapper.handshake()

    def parseMsg(self, jsonMsg):
        if self.findClientId:
            self.findClientId = False
            self.doFindClientId(jsonMsg)
            self.doSubscription()
        elif self.readyToConnect:
            self.readyToConnect = False
            self.connectClient()
        else:
            self.useMsg(jsonMsg)
            
    def useMsg(self, jsonMsg):
        pass
            
    def doSubscription(self):
        pass
        
    def doFindClientId(self, jsonMsg):
        self.clientId = jsonMsg[0]['clientId']
        print('got clientId : ' + self.clientId)
        
    def connectClient(self):
        self.wrapper.connect(self.clientId)
        
    def subscribe(self, channel):
        print('subscribing to '+channel+'...')
        self.readyToConnect = True
        self.wrapper.send(simplejson.dumps([{"channel":"/meta/subscribe","subscription":"/"+channel+"/*","clientId":self.clientId}]))
        
        
class MeasurementsClient(C8YClient):
    def __init__(self):
        super().__init__()
        
    def initClient(self):
        print('initMeasurementsClient...')
        super().initClient()
        
    def doSubscription(self):
        super().subscribe('measurements')
            
    def useMsg(self, jsonMsg):
        print('new measurement : ' + simplejson.dumps(jsonMsg))


class EventsClient(C8YClient):
    def __init__(self):
        super().__init__()
        
    def initClient(self):
        print('initEventsClient...')
        super().initClient()
        
    def doSubscription(self):
        super().subscribe('events')
            
    def useMsg(self, jsonMsg):
        print('new event : ' + simplejson.dumps(jsonMsg))

Ne reste plus qu'un petit main pour exécuter tout cela :


from threading import Thread
from websocket.c8y_client import MeasurementsClient, EventsClient  

if __name__ == '__main__':
    
    measurementsClient = MeasurementsClient()
    threadM = Thread(target = measurementsClient.init, args = ())
    threadM.start()
    
    eventsClient = EventsClient()
    threadE = Thread(target = eventsClient.init, args = ())
    threadE.start()

Nous disposons maintenant d'une simple application Python qui reçoit en temps réel des notifications d'évènements de Cumulocity! Vous pouvez le tester simplement via les requêtes REST déjà existantes dans la librairie Postman. Ainsi, POSTer un nouveau Measurement (relevé de données issu d'un capteur) déclenchera immédiatement sa réception côté Python.

A vous l'IIoT !


Fichier(s) joint(s) :