ENGINEER BLOG

ENGINEER BLOG

Cloud Robotics APIを使ってみた

こんにちは!コンサルティングサービス本部の野村です。

今回は、Cloud Robotics APIを試してみましたので、そのご紹介です。

概要

Cloud Robotics APIとは、マイクロソフトが提供するMicrosoft AzureのAI機能(Cognitive Services)を
Pepperと容易に連携させることのできるSDK/APIになります。

PepperとMicrosoft Azureを連携させることで、
 1. 様々なIoTデバイスとの接続
 2. デバイス上の通信データのロギング
 3. Cloud AI(翻訳、会話、画像認識など)との連携
などができるようになります。

今回、30社限定で無償利用できるプログラムに当選したので、
FaceVisionのハンズオン中心にその内容を簡単にご紹介したいと思います。

準備するもの

  • Choregraphe
  • Pepper本体
  • APIの接続情報
     1. Azure IoT Hubホスト名
     2. デバイスID
     3. デバイスキー

全体像

利用時の全体像はざっくりこんな感じになります。

zentai

Pepper側のIoT Hub Client(API Client)モジュールを使ってCloud Robotics APIをコールし、
Microsoft Azureの各種サービスを利用します。

ハンズオン ~FaceVision~

FaceVisionハンズオンでは、Pepperが顔認識と写真分析を行います。

まず、API Clientを生成し、Azure IoT Hubに接続します。

import time
import os.path

class MyClass(GeneratedClass):
    def __init__(self):
        GeneratedClass.__init__(self, False)
        self.behaviorPath = os.path.normpath(ALFrameManager.getBehaviorPath(self.behaviorId))
        # 写真画像のパス
        self.filepath = os.path.join(os.path.normpath(self.behaviorPath), 'image.jpg')

    def onLoad(self):
        self.client = None

        # TODO: 【Step.1】 1.API Clientの生成
        self.azure_iot_hub_hostname = 'Azure IoT Hubホスト名'
        self.azure_iot_hub_device_id = 'デバイスID'
        self.azure_iot_hub_device_key = 'デバイスキー'

        self.storage = {}

    def onInput_onStart(self):
        import cloudrobotics.client as crfx
        import cloudrobotics.message as message

        # TODO: 【Step.1】 1.API Clientの生成
        self.client = crfx.CRFXClient(self.azure_iot_hub_hostname, self.azure_iot_hub_device_id, self.azure_iot_hub_device_key)
        self.client.on_connect_successful = self.on_connect_successful
        self.client.on_connect_failed = self.on_connect_failed
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish

        # 処理の開始
        self.client.start()

"init"メッセージを送信し、ストレージ情報を取得します。

        # TODO: 【Step.1】 2.ストレージ情報の取得
        init_message = message.CRFXMessage()
        init_message.header['RoutingType'] = 'CALL'
        init_message.header['MessageId'] = 'init'
        init_message.header['AppProcessingId'] = 'RbAppFaceApi'

        # メッセージの送信(APIコール)
        self.client.send_message(init_message)

取得したストレージ情報を元に、ストレージに写真をアップロードします。
(写真の撮影はPepperの左手タッチをトリガーにします)

    def onInput_onCallRegister(self):
        # TODO: 【Step.2】 1.ストレージへの写真アップロード
        import cloudrobotics.message as message
        from cloudrobotics.storage import upload_to_storage

        if not os.path.isfile(self.filepath):
            self.logger.warn('There is no picture file. :' + self.filepath)
            return

        try:
            upload_to_storage(self.storage['account'], self.storage['key'], self.storage['container'], self.filepath) 
        except Exception as e:
            self.logger.error('Failed to upload this file to the Azure Blob storage, because ' + str(e))
            return

"registerFace"メッセージを送信し、撮影した顔写真を登録します。

        # TODO: 【Step.2】 2.顔登録
        register_message = message.CRFXMessage()
        register_message.header['RoutingType'] = 'CALL'
        register_message.header['AppProcessingId'] = 'RbAppFaceApi'
        register_message.header['MessageId'] = 'registerFace'

        register_message.body['visitor'] = 'xxxxxx'
        register_message.body['groupId'] = 'test'
        register_message.body['locationId'] = 'all' 
        register_message.body['visitor_name'] = '野村'
        register_message.body['visitor_name_kana'] = 'xxx' 
        register_message.body['blobFileName'] = 'image.jpg'
        register_message.body['deleteFile'] = 'true'

        self.client.send_message(register_message)

"getFaceInfo"メッセージを送信し、登録した顔写真を使って顔認識を行います。
(顔認識はPepperの右手タッチをトリガーにします)

    def onInput_onCallRecognize(self):
        # TODO: 【Step.3】 1.顔認識
        import cloudrobotics.message as message
        from cloudrobotics.storage import upload_to_storage

        if not os.path.isfile(self.filepath):
            self.logger.warn('There is no picture file. :' + self.filepath)
            return

        try:
            upload_to_storage(self.storage['account'], self.storage['key'], self.storage['container'], self.filepath) 
        except Exception as e:
            self.logger.error(str(e))
            return

        recognize_message = message.CRFXMessage()
        recognize_message.header['RoutingType'] = 'CALL' 
        recognize_message.header['AppProcessingId'] = 'RbAppFaceApi' 
        recognize_message.header['MessageId'] = 'getFaceInfo' 

        recognize_message.body['visitor'] = 'xxxxxx' 
        recognize_message.body['groupId'] = 'test' 
        recognize_message.body['locationId'] = 'all' 
        recognize_message.body['blobFileName'] = 'image.jpg' 
        recognize_message.body['deleteFile'] = 'true' 

        self.client.send_message(recognize_message)

コールバックでメッセージを受信したときのPepperの挙動をそれぞれ制御します。

    # メッセージを受信した時
    #
    def on_message(self, received_message):
        self.logger.info('received.')
        self.logger.info(str(received_message.header) + ', ' + str(received_message.body))

        # メッセージヘッダーのMessageIdに応じて処理を実装
        if received_message.header['MessageId'] == 'init':
            # TODO: 【Step.1】 2.ストレージ情報の取得
            self.storage['account'] = received_message.body['storageAccount']
            self.storage['key'] = received_message.body['storageKey']
            self.storage['container'] = received_message.body['storageContainer']

            self.logger.info("Account : " + self.storage['account']) 
            self.logger.info("Key : " + self.storage['key']) 
            self.logger.info("Container : " + self.storage['container']) 

        elif received_message.header['MessageId'] == 'registerFace':
            # TODO: 【Step.2】 2.顔登録
            if received_message.body['success'] == 'true':
                self.onRegistered('登録に成功しました。')
            else:
                self.onFailRegistration('登録に失敗しました')

        elif received_message.header['MessageId'] == 'getFaceInfo':
            # TODO: 【Step.3】 1.挨拶
            if received_message.body['success'] == 'true' and received_message.body['visitor_name']:
                self.onRecognized(received_message.body['visitor_name'] + 'さん、こんにちは')

                # TODO: 【Step.3】 2.その他の接客
                pass
            else:
                self.onFailRecognition('認識に失敗しました。')

Choregrapheのフローはこんな感じになります。

choregraphe


やってみます。


face

うまくいきました!
Pepperの左手をタッチするとPepperが写真を撮ってその画像を登録し、
右手をタッチすると画像認識して「野村さん、こんにちは」と反応してくれました。



次に、写真分析をやってみます。
顔認識のときと同様、API Clientを生成し、Azure IoT Hubに接続します。
(APIの接続情報はchoregrapheのボックス変数に設定します)

import time
import os.path

class MyClass(GeneratedClass):
    def __init__(self):
        GeneratedClass.__init__(self, False)
        self.behaviorPath = ALFrameManager.getBehaviorPath(self.behaviorId)

    def onLoad(self):
        self.client = None
        self.groupId = self.getParameter('GroupId')

    def onUnload(self):
        if self.client is not None:
            self.client.stop()

    def onInput_onStart(self):
        import cloudrobotics.picturerecognition.client as picturereco

        self.client = picturereco.CRFXPictureRecognitionClient(self.getParameter('AzureIoTHubHostName'), self.getParameter('AzureIoTHubDeviceId'), self.getParameter('AzureIoTHubDeviceKey'))
        self.client.on_connect_successful = self.on_connect_successful
        self.client.on_connect_failed = self.on_connect_failed
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish

        self.client.start()

        self.client.init()

プロジェクトファイル内にある写真をランダムに選択した結果のパスを受け取り、
写真分析APIに送信します。

    def onInput_onRecognize(self, p):
        import os.path
        self.logger.info('start recognition.')

        filepath = os.path.normpath(os.path.join(self.behaviorPath, '../html', p))

        if os.path.isfile(filepath):
            self.client.on_analyze = self.on_analyze
            self.client.analyze('xxxxxxx', filepath)
        else:
            self.logger.warn('There is no picture file. : ' + filepath)

コールバックで分析結果をPepperに説明させます。

    def on_analyze(self, is_success, seq, visitor, result):
        self.logger.info(str(result))

        if is_success == 'true':
            if result['Description_jp']:
                self.onRecognized('この写真は、「' + result['Description_jp']+'」ですね。')
            else:
                self.onNotRecognized('何か分かりません。')
        else:
            self.onNotRecognized('何か分かりません。')

choregrapheのフローはこんな感じです。
(Select Pictureボックスでランダムに写真を選択しています)

choregraphe2


まずはこちらの写真でやってみた結果です。


picture4

an_laptop

ごちゃごちゃした写真にも関わらず、いい感じに分析してくれました。



次はこの写真の結果です。

picture2

an_cat

ボカされた背景からなんとなく鳥を見てると判断したのでしょうか。
カシコイです。

まとめ

今回は、顔認識と写真分析APIのご紹介でしたが、ハンズオンでは他にも翻訳、会話、
Device to DeviceのAPIが含まれていました。

Microsoft Azureを詳しく知らなくても簡単にAPIの利用ができ、
基本的に同じ手順でいろんなAPIが使えるのでとても便利です。

Azureのバックエンドサービスを使うことで、Pepperの可能性もかなり広がりそうです!