Google Calendarの予定に応じてRaspberryPiに処理をさせる(その2)
はじめに
この記事はオートロックマンション用不在時荷物受け取りシステムの開発記事です。
前回までに、RaspberryPiとGoogle Calendarを連携させて、定期的に予定を取得、テキストファイルに出力するところまで述べた。今回は出力したテキストファイルを使ってカレンダーに登録された時間に応じた処理をさせる。
前回までの仕様では、インターホンが鳴った場合にいつでも解錠してしまう状態だったため、カレンダーに登録された配達予定時間帯のみ、インターホン音検知を有効化する。
具体的には、
カレンダーに登録された配達予定開始時刻になったら音検知を有効化
カレンダーに登録された配達予定終了時刻になったら音検知を無効化
する。
RaspberryPi:Model 3B+
Python:3.7.3
python+scheduleによる関数定期実行
定期的にプログラムを実行させるものとして、cronがよく使われるが、今回はpythonスクリプト中で直接予定の登録等を行いたかったため、scheduleを用いた。
これを使うと関数の定期実行などが簡易にできる。使い方も簡単で、ユーザーフレンドリーなインターフェースをしている。
基本的な使い方はいろんな人が解説しているので、下記記事等を参照して欲しい。
今回の使い方としては、
配達予定の開始終了時間が記述されたテキストファイル(schedule.txt)を読み込む
開始時間になったらインターホン音検知を有効化する
終了時間になったらインターホン音検知を無効化する
といった感じである。なお、schedule.txtには例として以下のようなデータが含まれる。この場合1月5日の9~11時、13~14時、18~19時の間に時間指定の配達予定がある。(実際そんなに荷物を頼むことはないが・・・)
2020-01-05T09:00:00 2020-01-05T11:00:00 2020-01-05T13:00:00 2020-01-05T14:00:00 2020-01-05T18:00:00 2020-01-05T19:00:00
実際にscheduleを使ってみる
まずは、定期的に呼び出してschedule.txtの中身をチェック、新しい配達予定時間が追加されていたら、scheduleで検知有効化、無効化の関数の呼び出しを登録するような関数を定義する。
import datetime import schedule schedule_list = [] def updateSchedule(): global schedule_list f = open("schedule.txt", "r") lines = f.readlines() f.close() schedule_new = [] for line in lines: line = line.rstrip() # delete \n schedule_new.append(line) if not line in schedule_list: log.info('New schedule added:' + line) # This is new schedule times = line.split() st_date = datetime.datetime.strptime(times[0], '%Y-%m-%dT%H:%M:%S') en_date = datetime.datetime.strptime(times[1], '%Y-%m-%dT%H:%M:%S') # set schedule schedule.every().day.at(st_date.strftime("%H:%M")).do(activateDetection) schedule.every().day.at(en_date.strftime("%H:%M")).do(deactivateDetection) schedule_list = schedule_new
このupdateSchedule()はsignalのようなタイマー割り込みを使って定期的に呼び出すようにしておく。
if not line in schedule_list:
の部分でその予定がこれまでになかった新しい予定か否かを判別している。
lineには上のテキストファイルの例で示したように[配達予定開始時刻] [配達予定終了時刻]が記述されているため、split()でスペース区切りで開始時刻(例:2020-01-05T09:00:00)、終了時刻(例:2020-01-05T11:00:00)に分割する。(times = line.split()
)
さらに、それらの時刻の文字列をstrptimeで日付に変換する。(st_date = datetime.datetime.strptime(times[0], '%Y-%m-%dT%H:%M:%S')
)
最後に、strftimeでHH:MMの形式に再度文字列化し、scheduleに登録する。登録する関数は検知有効化関数(activateDetection)、検知無効化関数(deactivateDetection)である(詳細は後述)。なお、この方法でscheduleに登録すると、スクリプトを見てわかるように、毎日同じ時刻にactivateDetection()(およびdeactivateDetection())が呼ばれることになる。実際の使い方として、ある配達予定時刻に対しては一度関数を呼び出せばいいはずである。scheduleはこのような日付指定の一度だけ呼び出しに対応していないようである。一度だけ関数を実行する方法は次で記述する。
scheduleで呼び出される関数
次に、scheduleで呼び出される検知有効化:activateDetection(), 検知無効化:deactivateDetection()のそれぞれの関数の中身である。
def activateDetection(): global activated activated = True return schedule.CancelJob def deactivateDetection(): global activated activated = False return schedule.CancelJob
中身は非常にシンプルで、scheduleで登録した時間に関数が呼び出されたら、グローバル変数であるactivatedをTrueにしたりFalseにしたりというものである。あとはメインルーチンの方でインターホンの音が鳴ったときにactivatedを参照してTrueだったら配達予定時刻にインターホンが鳴ったものと判断して解錠してやれば良い。
各関数の最後のreturn schedule.CancelJob
が今回のキモの部分である。scheduleでは特定日付の特定時刻の呼び出しには対応していないため、今回はその日一日のみの予定をGoogle Calendarから取得し、その日の予定時刻を登録し、一度だけ実行したらscheduleを停止したい。scheduleで一度のみ実行したい場合は、上のようにreturn schedule.CancelJob
してやれば良い。公式サポートのFAQにもその記述がある(下記参照)。
これらにより、Google Calendarに登録した予定に応じてインターホン音の検知・非検知(解錠・非解錠)を切り替えられるようになった。
まとめ
python+scheduleを使って関数を定期実行する方法を述べた。
scheduleで関数を一度のみ実行する方法を述べた。
これらを使ってGoogle Calendarに登録された予定に応じてインターホン音の検知・非検知(解錠・非解錠)を切り替えられるようになった。
ここまでできるようになると、次はインターホンのモニタ画像を見たくなってくる。というわけで次回はインターホンが鳴ったときのモニタ画像をウェブカメラで撮影して、スマホに通知していく。
Google Calendarの予定に応じてRaspberryPiに処理をさせる(その1)
はじめに
この記事はオートロックマンション用不在時荷物受け取りシステムの開発記事です。
前回までに、RaspberryPi+ウェブカメラ+サーボモータを使って「インターホン音検知」→「インターホンの解錠ボタン押下」までできた。しかしながら、このままではインターホンを押した人は誰でも入れるガバガバセキュリティ状態になってしまう。
そこで、今回はRaspberryPiとGoogle Calendarを連携させ、ユーザがカレンダー上で指定した時間帯のみ、インターホンの音に応じて解錠するように改良する。例えば、あらかじめ時間指定した配達であれば、その時間をGoogle Calendar上に登録しておく。また時間指定はできないが日付が決まっている場合は、終日イベントとして登録しておく。RaspberryPiはそれに応じてインターホンに応答する、という具合。
RaspberryPi:Model 3B+
Python:3.7.3
RaspberryPiとGoogle Calendarの連携
PythonからGoogle Calendarに情報を取りに行く方法は、以下のGoogle Calendar APIのPython Quickstartにすべて手順が載っている。基本的にその手順に従っていけば良い。
RaspberryPiからカレンダー情報を取りに行く上で重要な点を踏まえつつ手順を述べる。開発環境はPCからSSHでRaspberryPiに接続していることを前提としているが、最初のSTEP1まではPC上で作業したほうが楽である。
STEP1:Google Calendar API有効化
まず、当たり前だがgoogleアカウントを用意し、情報を取りに行くカレンダーを作成する。もともと持っているならそれでOK。そのアカウントでログインしておく。
次に上記QuickstartのSTEP1を行う。STEP1にある「Enable the Google Calendar API」をクリックし、APIを有効化する。
有効化がうまく行けば、以下の画面が表示される。「Download Client Configuration」からコンフィギュレーションファイルをダウンロードする。credentials.jsonという名前だ。
もしPCでcredentials.jsonをダウンロードしたなら、そのファイルをRaspberryPiのワーキングディレクトリにWinSCP等使ってコピーしておく。
STEP2:必要なライブラリのインストール
ここから先はRaspberryPiでの作業だ。 credentials.jsonがRaspberryPiに用意できたら、Pythonの必要なライブラリをRaspberryPiにインストールする。これもquickstartのSTEP2の通りだが、
pip3 install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
でインストールできる(Python3を使っているのでpip3)。
STEP3:RaspberryPiからGoogle Calendarへのアクセスを許可する
ライブラリがインストールできたら、情報を取得しに行くわけだが、最初に一度認証を行う必要がある。 認証に使うスクリプトはquickstartにあるので、それを使う。
先程credentials.jsonを保存したRaspberryPiのワーキングディレクトリにquickstart STEP3にあるサンプルファイルを保存する。例えばwgetを使えば以下のコマンドで取得できる。
wget https://raw.githubusercontent.com/gsuitedevs/python-samples/master/calendar/quickstart/quickstart.py
この先、認証のためにGUIが必要になるため、以下の記事などを参考にXwindowでGUI環境が使えるようにしておく。
Raspberry Pi 2(略してパイ2)のGUI環境をWindows10から使う - Qiita
サンプルが保存できたら、python3 quickstart.py
で早速サンプル実行する。
すると、認証URLが表示されるため、クリックしてブラウザでそのURLにアクセスする(ここでGUIが必要になる)。
googleアカウントの認証画面が出るので、Google Calendarに紐付いたアカウント名、パスワードを入力し、画面に従って許可をする。
問題なく終わればその旨表示され、ワーキングディレクトリにtoken.pickleというファイルができているはずである。それを確認したらブラウザは閉じて構わない。
STEP4:サンプル実行
さて、ここまで来たらいよいよGoogle Calendarからの情報取得である。
実は、先程認証に使ったquickstart.pyがそのまま直近10個の予定を取得するサンプルスクリプトにもなっている。python3 quickstart.py
で実行してみよう。直近の予定が表示されるはずだ。日本語の予定を取得する場合は文字コードに注意する。(自分の環境ではEUC-JPで正しく表示できた)
ここまでがうまくできたらあとは自分のスクリプトに組み込んでいく。
APIの実際の使い方
Pythonで使いやすいように、quickstart.pyを改造してクラス化した。まずはそれを掲載する。今回の不在時荷物受け取りシステムでは、深夜0時にその日の予定をRaspberryPiから取得しに行くため、get_today_schedule()という関数を作った。また、最初に一度必ず必要なログイン処理はlogin()という単独の関数とした。
from datetime import datetime, date, time, timedelta from pytz import timezone import pickle import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request # If modifying these scopes, delete the file token.pickle. SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] class googleCalendarApi: def __init__(self): self.jst = timezone('Asia/Tokyo') def login(self): creds = None # The file token.pickle stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists('token.pickle'): with open('token.pickle', 'rb') as token: creds = pickle.load(token) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open('token.pickle', 'wb') as token: pickle.dump(creds, token) self.creds = creds def logout(self): pass def get_today_schedule(self, remain=False): service = build('calendar', 'v3', credentials=self.creds) # Call the Calendar API now = datetime.now() if remain: begin_time = now else: begin_time = datetime.combine(now, time(0,0)) begin_time = self.jst.localize(begin_time) end_time = datetime.combine(now, time(23,59,59)) end_time = self.jst.localize(end_time) events_result = service.events().list(calendarId='primary', timeMin=begin_time.isoformat(), timeMax=end_time.isoformat(), singleEvents=True, orderBy='startTime').execute() events = events_result.get('items', []) event_list = [] if not events: print('No upcoming events found.') for event in events: st = event['start'].get('dateTime') if st == None: st = event['start'].get('date') en = event['end'].get('date') start_date = datetime.strptime(st, '%Y-%m-%d') end_date = datetime.strptime(en, '%Y-%m-%d') start_date = start_date + timedelta(hours = 9) # all-day event starts 9AM end_date = end_date + timedelta(hours = -3) # all-day event ends 21PM ev = {"summary":event['summary'], "all-day":True, "start":start_date, "end":end_date} else: en = event['end'].get('dateTime') start_date = datetime.strptime(st, '%Y-%m-%dT%H:%M:%S+09:00') end_date = datetime.strptime(en, '%Y-%m-%dT%H:%M:%S+09:00') ev = {"summary":event['summary'], "all-day":False, "start":start_date, "end":end_date} event_list.append(ev) return event_list
Google Calendar APIの使い方については、以下のReferenceを参照。
例えば、具体的にカレンダーの予定を取得しているのは以下の部分である。
events_result = service.events().list(calendarId='primary', timeMin=begin_time.isoformat(), timeMax=end_time.isoformat(), singleEvents=True, orderBy='startTime').execute() events = events_result.get('items', [])
取得開始時刻と終了時刻をISOフォーマットで指定して、開始時間順で取得する。
その次にそれらのイベントの必要な情報のみをリスト化する。ここで注意なのは、時間指定のイベントと週日イベントではイベント開始時刻や終了時刻の扱いに差がある点である。
具体的には、終日イベントには開始時刻event['start'].get('dateTime')
や終了時刻event['end'].get('dateTime')
がなく、Noneが入っている。そのため、終日イベントの場合は、実際に配達が行われる可能性のある、AM9時~PM9時までとした。以下がその部分。
if st == None: st = event['start'].get('date') en = event['end'].get('date') start_date = datetime.strptime(st, '%Y-%m-%d') end_date = datetime.strptime(en, '%Y-%m-%d') start_date = start_date + timedelta(hours = 9) # all-day event starts 9AM end_date = end_date + timedelta(hours = -3) # all-day event ends 21PM ev = {"summary":event['summary'], "all-day":True, "start":start_date, "end":end_date}
そして、このGoogleCalendarApiクラスをインポートして使う。 例えば、その日の予定の名から「配達」という名前のイベントだけ抽出し、その開始時刻と終了時刻をファイル(schedule.txt)に書き出すスクリプトは以下のようになる。
import googleCalendarApi fpath = 'ワーキングディレクトリ' fname = 'schedule.txt' cl = googleCalendarApi.googleCalendarApi() cl.login() events = cl.get_today_schedule(remain = True) f = open(fpath+fname, 'w') for event in events: if event["summary"] == "配達": start = event["start"].strftime('%Y-%m-%dT%H:%M:%S') end = event["end"].strftime('%Y-%m-%dT%H:%M:%S') f.write(start + " " + end + "\n") f.close()
これをCRONに登録して毎日深夜0時に呼び出すことで、その日の予定をテキストに吐き出している。CRONで呼ぶ場合はワーキングディレクトリの部分を絶対パスで記述すること。
まとめ
Google Calendar APIを使い、RaspberryPiからGoogle Calendarの予定を取得する方法を述べた。基本はquickstartに従っていけば良い。
次回は、これを使って作成したスケジュールのテキストファイルを実際に使ってインターホンの応答/非応答を行う。
RaspberryPi+サーボモータでインターホンに応答する
はじめに
この記事はオートロックマンション用不在時荷物受け取りシステムの関連記事です。
さて、これまでの間にRaspberryPiでインターホンの音を検知するところまでいけたので、 今回はその音に応じてインターホンの解錠ボタンを自動で押してみようと思う。(解錠ボタンの配置は以下の写真参照)
これができれば、「インターホン音を検知→解錠」が自動でできることになる。
どうやって解錠ボタンを押すか
インターホンの解錠ボタンは軽く押せるので、そこまで力は必要ない。
今どきであればSwitchbotのようなIoTボタンみたいなものを買えばいいのかもしれないが、せっかくなのでサーボモータを使って押してみようと思う。
サーボモータ選び
今回、サーボモータ選びの条件は、
- 秋月電子で売っていること
- そこそこ小型であること
- 安価であること
- 通常のPWMで駆動できること
といった感じである。ボタン自体は軽いのでパワーは普通にあれば考慮しない。
最終的に選んだのはマイクロサーボ SG92Rである。
一般的なPWM駆動のサーボモータで、比較小型・安価である。インターホンのボタンを押すには十分。
pigpioによるサーボモータ駆動
ちょっと昔RaspberryPiの1や2を使っているときはWiringPiを使っていたが、どうやら最近はpigpioというライブラリがRaspberryPiのGPIO制御をPythonでやる場合のデファクトスタンダードらしい。
pigpioの詳細については、色んな人が書いているのでそちらを参照のこと。
サーボモータを駆動する場合はPWMが必要になる。pigpioによるPWM出力については以下の記事が詳しい。
1点、RaspberryPiからサーボモータを駆動するときの注意点として、RaspberryPiにつないでいるACアダプタによっては電力が足りず、低電圧警告が出る場合がある。RaspberryPiのDCジャック付近の赤LEDが消灯したらそれが低電圧警告である。
私が使っているACアダプタではこの症状になったため、サーボモータの電源には別途5VのACアダプタを用意し、そこから電源を供給している。上の写真にはRaspberryPiの電源(USB)と別にもう一つACアダプタが見えるが、それがサーボモータ用の電源である。DCジャックをホットボンドでケースに貼り付けて、配線をそこからサーボモータに飛ばしている。GNDはRaspberryPiと共通だ。
pigpioの使い方などはいろんな記事で紹介されているが、この記事でも簡単に、(インターホンのボタンを押すのに)最低限必要な使い方を記しておく。
pigpioのインストール
sudo apt-get update sudo apt-get install pigpio
デーモンの立ち上げ
sudo pigpiod
これはRaspberryPiを起動するたびに実行しなければいけないため、/etc/rc.local
に記述して起動の度に自動実行されるようにしておいたほうが良い。
Pythonスクリプトの記述
まずは単にサーボモータを1秒ごとに動かすだけのスクリプトを書いてみる。
import time import pigpio def unlockDoor(): servoPush() time.sleep(SERVO_TIME) servoRelease() def calcServoDuty(ratio): # ratio:0-1でサーボ角度を指定 # SG92Rの制御パルスが0.5ms-2.4msなのでdutyに25000(1000000/20*0.5)-120000(100000/20*2.4)をマッピングする duty = 25000 + (120000 - 25000) * ratio return duty def moveServo(ratio): duty = calcServoDuty(ratio) pi.hardware_PWM(PORT_PWM, SERVO_PERIOD, int(duty)) def servoRelease(): moveServo(SERVO_RELEASE) def servoPush(): moveServo(SERVO_PUSH) if __name__=='__main__': # 実験的に決定したサーボのPWMパラメータ SERVO_RELEASE = 0.5 # ボタン解放状態(サーボモータ角度中間) SERVO_PUSH = 0.35 # ボタン押下状態 SERVO_TIME = 0.2 # サーボモータを押す時間[秒] pi = pigpio.pi() # servo setting PORT_PWM = 18 SERVO_PERIOD = 50 # [Hz] :PWMサイクル20ms pi.set_mode(PORT_PWM, pigpio.OUTPUT) moveServo(0.5) while 1: try: unlockDoor() time.sleep(1) except KeyboardInterrupt: break
「インターホンを押す」という目的の上で重要なのは上記スクリプトでいうSERVO_PUSHである。 この値を細かく調整することで、インターホンの解錠ボタンを押すのに丁度いい角度を探っていく。あまりに角度が浅いとちょっとヘタってきたときに押せなくなるし、角度が深すぎるとサーボモータが過負荷になって焼ける可能性がある。
なお、サーボモータはインターホンの筐体に強力両面テープで貼り付けている。ソフトタイプのほうが少しクッション性があって良い。
調整がうまく行くとこんな感じで解錠ボタンを押せる。
サーボモータがインターホンを押す練習中。 pic.twitter.com/VtkD3wGJaI
— 水田かなめ (@kmizta) January 3, 2020
ちょうど良いパラメータが決まったら、あとはインターホン音検知と組み合わせるだけだ。 インターホン音検知については以前の記事を参照。
音検知とボタンプッシュを組み合わせると、こんな感じに使える。
ラズパイ+ウェブカメラでインターホン検知→サーボモータで解錠。あまりに反応が早いと怪しいので少し間を空けて。 pic.twitter.com/BGr4zJplWk
— 水田かなめ (@kmizta) December 31, 2019
これでインターホン音検知+スマホ通知(過去記事参照)+ボタン解錠ができるようになった。
あとは玄関に置き配バッグを配置しておけば、一応不在時にも荷物を受け取ることは可能である。ただ、このままではインターホンを押した人は誰でも入れてしまうセキュリティガバガバシステムが出来上がってしまうので、次はそういった方面の実装を加えていく。
SSH切断時に起動していたプログラムに、SSH再接続後につなげる
ほぼほぼ個人的なメモ。
やりたいことは、SSHで接続しているRaspberryPiで、家出る前にpythonのプログラムを起動して、一旦SSHを切断して家帰ってきてからそのプログラムの起動画面に再度繋げる方法。
ここでは上のようなことをやりたいときに必要最低限の手順を示す。
(切断前)
1. ”tmux”で新しいターミナル画面を開く
tmuxはターミナルマルチプレクサなるものらしい。所望のプログラム起動前に、単純に以下のコマンドを打てばよい。
tmux
2. 新しいターミナル画面で普通にプログラムを起動する
tmuxと打つとまっさらな画面が立ち上がるので、そこでいつもどおりプログラムを起動する。起動を見届けたら切断する。
python3 hogehoge.py
(切断後)
3. 家に帰ってきたら再度SSH接続し、tmuxのセッションにつなげる
以下のコマンドで切断時につなげていたセッションに戻ることが可能。
tmux a
無事セッションに戻ったら、あとは煮るなり焼くなり。
プログラムを終了してセッションを閉じる場合は、Ctrl+Cでプログラムを閉じた後に、
exit
でtmuxのセッションを抜けられる。
python3にもkbhit()とgetch()が欲しい
はじめに
何言語でプログラミングするにしろ、ユーザからのキーボード入力はよく使うだろう。 特によく使う方法は「通常はメインの処理をしていて、キーボード入力が何かあったときだけ、そのキーに応じて別の処理をする」というものである。 私はよくPICなどを使って電子工作しているが、そういったデバイスではあまり文字列処理はしない。1文字で十分である。
すなわち、kbhit()+getch()である。これさえあればOK。
最近はラズパイでpythonを使っているが、ラズパイでもそういった処理をする方法を紹介する。
kbhit.pyというものがある
やっぱりこういうことは誰でもやりたいと思うからか、ライブラリを作って公開してくれている人がいるので、それをありがたく使わせてもらう。
ただ、これをpython3でそのまま実行しようとすると以下のようなエラーが出る。
File "recipe-572182-1.py", line 33 return dr <> [] ^ SyntaxError: invalid syntax
この「<>」はpythonのnot equalの記号だが、python3では文法エラーになるようなので、それを修正したコードをここに書いておく。(といっても<>を!=に修正しただけ)
import sys, termios, atexit from select import select # save the terminal settings fd = sys.stdin.fileno() new_term = termios.tcgetattr(fd) old_term = termios.tcgetattr(fd) # new terminal setting unbuffered new_term[3] = (new_term[3] & ~termios.ICANON & ~termios.ECHO) # switch to normal terminal def set_normal_term(): termios.tcsetattr(fd, termios.TCSAFLUSH, old_term) # switch to unbuffered terminal def set_curses_term(): termios.tcsetattr(fd, termios.TCSAFLUSH, new_term) def putch(ch): sys.stdout.write(ch) def getch(): return sys.stdin.read(1) def getche(): ch = getch() putch(ch) return ch def kbhit(): dr,dw,de = select([sys.stdin], [], [], 0) return dr != [] # ここだけ修正 if __name__ == '__main__': atexit.register(set_normal_term) set_curses_term() while 1: if kbhit(): ch = getch() break sys.stdout.write('.') print('done:'+ch)
実際にpython3のプログラムにインポートして使うときは、
from kbhit import *
でインポートして、初期化のときに以下のコードを実行
atexit.register(set_normal_term) set_curses_term()
そしたらあとは、
if kbhit():
ch = getch()
で好きなだけkbhit()+getch()できる。やったね。
Pushbulletを使ってRaspberryPiからスマホ・PCに通知を送る
これまでのあらすじ
宅配便の再配達や、荷物の受け取りのために自宅待機するのが大変苦手な私は、RaspberryPiを使って荷物の受け取りを完全無人化するシステムを開発することを考えた。
問題は、家が宅配ボックスのないオートロックマンションということである。玄関前に置き配してもらうにもインターホンが鳴ったときに共同玄関の解錠ボタンを押す必要がある・・・
前回までにインターホンの音を検知するところまで行った。
今回はインターホン音を検知したことをスマホ・PCに通知することをやってみる。
スマホ・PCへのプッシュにはPushbulletが便利
今回、ラズパイからスマホ・PCへのプッシュにはPushbulletを用いた。
Pushubulletは様々な端末同士をつないでチャットしたりメッセージや写真を送ったり通知を共有するサービスである。Googleアカウントでサインインすればすぐに使うことができる。普段使いとしても、スマホからちょっとPCにリンクを送ったり、ファイルを送ったりするのに手軽で便利である。
今回の目的はラズパイからスマホ・PCへのプッシュであり、ラズパイにはPythonを使っている。PushbulletをPythonから使うには、pushbullet.pyが便利である。
pushbullet.pyはその名の通りPushbulletのサービスをPythonから呼び出せるようにしたライブラリである。Pushbulletのサイトにgoogleアカウントなどでサインインしたら、通知を受け取りたいスマホやPCをあらかじめ設定しておこう。ラズパイへのインストール方法は上記公式ウェブサイトに書いてあるが、非常に簡単で、
pip3 install pushbullet.py
だけでOK。
ラズパイから通知などをプッシュするには、あらかじめAPI KEYを取得しておく必要がある。取得も簡単で、pushbulletのサイトにサインインしたら、SettingからAccount、Access Tokensの下にある「Create Access Token」をクリックするとAPI KEYが表示されるので、それをコピーすれば良い。
API KEYが取得できたら、あとはPythonで以下のようなスクリプトを書けば設定したスマホやPCに通知がすぐに来る。
from pushbullet import Pushbullet apikey = "さっき取得したAPIKEY" pb = Pushbullet(apikey) # テキストをプッシュする push = pb.push_note("RaspberryPi", "push message")
これを実行すると、下のような通知が来る。簡単である。
接続環境によるだろうが、ちょっとしたテキストを送るだけならタイムラグはほとんど感じない(1秒弱)。画像を送る場合でも1000x1000くらいの画像なら3秒程度でアップロードできる。ほぼリアルタイムって感じだ。さすがPushbullet。弾丸のように速いということか。
テキストを送る以外に、下のような感じで画像をプッシュすることも可能だ。これは次あたりの記事で使う。
with open("my_cool_picture.jpg", "rb") as pic: file_data = pb.upload_file(pic, "picture.jpg") push = pb.push_file(**file_data)
インターホンを検知したことを通知する
前回の記事に記載した、ウェブカメラを使ったインターホン音検知のコードに、Pushbulletで通知をプッシュするコードを追記する。 (追記したのはPushbullet関連の数行のみ)
import pyaudio import numpy as np import time from scipy.signal import argrelmax from pushbullet import Pushbullet # interphone setting CHUNK = 1024 RATE = 8000 # sampling rate dt = 1/RATE freq = np.linspace(0,1.0/dt,CHUNK) fn = 1/dt/2; # nyquist freq FREQ_HIGH_BASE = 886.0 # high tone frequency FREQ_LOW_BASE = 726.6 # low tone frequency FREQ_ERR = 0.02 # allowable freq error #variable detect_high = False detect_low = False # Pushbullet setting apikey = "Pushbulletで取得したAPIKEY" pb = Pushbullet(apikey) # FFTで振幅最大の周波数を取得する関数 def getMaxFreqFFT(sound, chunk, freq): # FFT f = np.fft.fft(sound)/(chunk/2) f_abs = np.abs(f) # ピーク検出 peak_args = argrelmax(f_abs[:(int)(chunk/2)]) f_peak = f_abs[peak_args] f_peak_argsort = f_peak.argsort()[::-1] peak_args_sort = peak_args[0][f_peak_argsort] # 最大ピークをreturn return freq[peak_args_sort[0]] # 検知した周波数がインターホンの音の音か判定する関数 def detectDualToneInOctave(freq_in, freq_high_base, freq_low_base, freq_err): det_h = det_l = False # 検知した周波数が高音・低音のX倍音なのか調べる octave_h = freq_in / freq_high_base octave_l = freq_in / freq_low_base near_oct_h = round(octave_h) near_oct_l = round(octave_l) if near_oct_h == 0 or near_oct_l == 0: return False, False # X倍音のXが整数からどれだけ離れているか err_h = np.abs((octave_h-near_oct_h) / near_oct_h) err_l = np.abs((octave_l-near_oct_l) / near_oct_l) # 基音、2倍音、3倍音の付近であればインターホンの音とする if err_h < freq_err: det_h = True elif err_l < freq_err: det_l = True return det_h, det_l if __name__=='__main__': P = pyaudio.PyAudio() stream = P.open(format=pyaudio.paInt16, channels=1, rate=RATE, frames_per_buffer=CHUNK, input=True, output=False) while stream.is_active(): try: input = stream.read(CHUNK, exception_on_overflow=False) ndarray = np.frombuffer(input, dtype='int16') abs_array = np.abs(ndarray)/32768 if abs_array.max() > 0.5: # FFTで最大振幅の周波数を取得 freq_max = getMaxFreqFFT(ndarray, CHUNK, freq) print("振幅最大の周波数:", freq_max, "Hz") h,l = detectDualToneInOctave(freq_max, FREQ_HIGH_BASE, FREQ_LOW_BASE, FREQ_ERR) if h: detect_high = True print("高音検知!") if l: detect_low = True print("低音検知!") if detect_high and detect_low: # インターホン音を検知したのでプッシュする push = pb.push_note("RaspberryPi", "インターホンが鳴ったよ") time.sleep(30) detect_high = detect_low = False except KeyboardInterrupt: break stream.stop_stream() stream.close() P.terminate()
こいつを実行してインターホンの音を聞かせると、
こんな感じで通知が即座に飛んでくる。便利な世界だ。
ラズベリーパイでインターホンの音を検知する
前回の記事で、オートロックマンション不在時荷物受け取りシステム(仮)の材料を揃えた。 westgate-lab.hatenablog.com
ここから実際にソフトウェアを開発していく。
やりたいこと
配達員の来訪を検知するには何はともあれインターホンが鳴ったことを検知する必要がある。原理的にはインターホンを分解してどこかからインターホンの信号を拾ってもいいが、賃貸マンションなのであまり原状復帰が難しくなるようなことはしないこととする。
なので、まずはウェブカメラについたマイクでインターホンの音を拾い、インターホンの音を識別、検知することを目標とする。使うウェブカメラは、logicoolのC270だ。
ロジクール ウェブカメラ C270 ブラック HD 720P ウェブカム ストリーミング 小型 シンプル設計 国内正規品 2年間メーカー保証
- 発売日: 2010/08/20
- メディア: Personal Computers
また、ラズベリーパイはModel 3B+である。言語はPythonで組んでいく。
リアルタイムで音声を取得する
ラズパイ+ウェブカメラ+Pythonで手軽に音声データを取得するにはPyAudioが便利である。 PyAudioの使い方は以下のNoteを参考にしたので、そちらを見てほしい。
リアルタイムで音声波形の取得【PyAudio】|もくいち|note
ここのコードをコピペすればとりあえずリアルタイムに音声の時系列データ(一定個数の配列)が得られるはずである。
如何にしてインターホンの音を識別するか
機械学習など使うほうがイマドキ感があるが、まずは目立つ周波数を抜き出して比較することとする。
家のインターホンの音を周波数解析してみる
家のインターホンの音がどういった周波数で構成されているか、スマホで録音したインターホンの音をAudacityのスペクトル解析機能を使って解析した。
うちのインターホンは鳴ると「ピロピロピロピロ・・・」という感じで、「ピ(↑)ロ(↓)」という高低の音の繰り返しである。
音声波形を表示するとこんな感じ。
最初が大きく、徐々に小さくなり、それが繰り返されることがわかる。
では、次にピ(↑)、すなわち高音部分を抜き出してスペクトル解析する。
いくつも波が立っているが、インターホンの音に対応するピークは基音から886Hz, 1778Hz, 2665Hz, 3550Hz・・・となっており、キレイに倍音が出ていることがわかる。
次にロ(↓)、すなわち低音部分を抜き出してスペクトル解析する。
同様にピークが立っており、基音から727Hz, 1450Hz, 2176Hz, 2905Hz・・・となっている。こちらもきれいな倍音である。うちのインターホンは、低音も高音も3倍音が最も大きい周波数のようである。
高速フーリエ変換(FFT)でインターホンの音を識別する
インターホンの音の素性がわかればあとはラズベリーパイでFFTしてその周波数を検知するのみである。
FFTはnumpyを使った。
手順としては、
- 常にPyAudioで音声を取得
- FFTで最大振幅の周波数を取得
- その周波数が低音の基音、2倍音、3倍音・・・の周辺であればインターホンの低音と識別
- その周波数が高音の基音、2倍音、3倍音・・・の周辺であればインターホンの高音と識別
- 高音・低音を検知したらインターホンが鳴ったものとする。
手順3,4でスペクトル解析で最も大きい音が出ていた3倍音以外も取ることにしたのは、録音環境やウェブカメラのマイクのスペックにより微妙に振幅が変化するためである。
コードはこんな感じ。
import pyaudio import numpy as np import time from scipy.signal import argrelmax # interphone setting CHUNK = 1024 RATE = 8000 # sampling rate dt = 1/RATE freq = np.linspace(0,1.0/dt,CHUNK) fn = 1/dt/2; # nyquist freq FREQ_HIGH_BASE = 886.0 # high tone frequency FREQ_LOW_BASE = 726.6 # low tone frequency FREQ_ERR = 0.02 # allowable freq error #variable detect_high = False detect_low = False # FFTで振幅最大の周波数を取得する関数 def getMaxFreqFFT(sound, chunk, freq): # FFT f = np.fft.fft(sound)/(chunk/2) f_abs = np.abs(f) # ピーク検出 peak_args = argrelmax(f_abs[:(int)(chunk/2)]) f_peak = f_abs[peak_args] f_peak_argsort = f_peak.argsort()[::-1] peak_args_sort = peak_args[0][f_peak_argsort] # 最大ピークをreturn return freq[peak_args_sort[0]] # 検知した周波数がインターホンの音の音か判定する関数 def detectDualToneInOctave(freq_in, freq_high_base, freq_low_base, freq_err): det_h = det_l = False # 検知した周波数が高音・低音のX倍音なのか調べる octave_h = freq_in / freq_high_base octave_l = freq_in / freq_low_base near_oct_h = round(octave_h) near_oct_l = round(octave_l) if near_oct_h == 0 or near_oct_l == 0: return False, False # X倍音のXが整数からどれだけ離れているか err_h = np.abs((octave_h-near_oct_h) / near_oct_h) err_l = np.abs((octave_l-near_oct_l) / near_oct_l) # 基音、2倍音、3倍音の付近であればインターホンの音とする if err_h < freq_err: det_h = True elif err_l < freq_err: det_l = True return det_h, det_l if __name__=='__main__': P = pyaudio.PyAudio() stream = P.open(format=pyaudio.paInt16, channels=1, rate=RATE, frames_per_buffer=CHUNK, input=True, output=False) while stream.is_active(): try: input = stream.read(CHUNK, exception_on_overflow=False) ndarray = np.frombuffer(input, dtype='int16') abs_array = np.abs(ndarray)/32768 if abs_array.max() > 0.5: # FFTで最大振幅の周波数を取得 freq_max = getMaxFreqFFT(ndarray, CHUNK, freq) print("振幅最大の周波数:", freq_max, "Hz") h,l = detectDualToneInOctave(freq_max, FREQ_HIGH_BASE, FREQ_LOW_BASE, FREQ_ERR) if h: detect_high = True print("高音検知!") if l: detect_low = True print("低音検知!") # dual tone detected if detect_high and detect_low: print("インターホンの音を検知!") time.sleep(30) print("フラグリセット") detect_high = detect_low = False except KeyboardInterrupt: break stream.stop_stream() stream.close() P.terminate()
実際にpython3で起動させて、インターホンの音をウェブカメラに聞かせると、
振幅最大の周波数: 2674.4868035190616 Hz 高音検知! 振幅最大の周波数: 2189.6383186705766 Hz 低音検知! インターホンの音を検知!
と出力され、正しくインターホンの音が検知できたことを確認できる。
ただし、最大振幅の周波数を見ているだけなので、日常生活のノイズをたまーにインターホンの音と誤検知する場合がある(週に2,3回)。これは将来的に機械学習で対応したいと思う。