West Gate Laboratory

人生を少しでも,面白く便利にするモノづくり

Pushbulletを使ってRaspberryPiからスマホ・PCに通知を送る

これまでのあらすじ

宅配便の再配達や、荷物の受け取りのために自宅待機するのが大変苦手な私は、RaspberryPiを使って荷物の受け取りを完全無人化するシステムを開発することを考えた。

問題は、家が宅配ボックスのないオートロックマンションということである。玄関前に置き配してもらうにもインターホンが鳴ったときに共同玄関の解錠ボタンを押す必要がある・・・

前回までにインターホンの音を検知するところまで行った。

westgate-lab.hatenablog.com

今回はインターホン音を検知したことをスマホ・PCに通知することをやってみる。

スマホ・PCへのプッシュにはPushbulletが便利

今回、ラズパイからスマホ・PCへのプッシュにはPushbulletを用いた。

www.pushbullet.com

Pushubulletは様々な端末同士をつないでチャットしたりメッセージや写真を送ったり通知を共有するサービスである。Googleアカウントでサインインすればすぐに使うことができる。普段使いとしても、スマホからちょっとPCにリンクを送ったり、ファイルを送ったりするのに手軽で便利である。

今回の目的はラズパイからスマホ・PCへのプッシュであり、ラズパイにはPythonを使っている。PushbulletをPythonから使うには、pushbullet.pyが便利である。

pypi.org

pushbullet.pyはその名の通りPushbulletのサービスをPythonから呼び出せるようにしたライブラリである。Pushbulletのサイトにgoogleアカウントなどでサインインしたら、通知を受け取りたいスマホやPCをあらかじめ設定しておこう。ラズパイへのインストール方法は上記公式ウェブサイトに書いてあるが、非常に簡単で、

pip3 install pushbullet.py

だけでOK。

ラズパイから通知などをプッシュするには、あらかじめAPI KEYを取得しておく必要がある。取得も簡単で、pushbulletのサイトにサインインしたら、SettingからAccount、Access Tokensの下にある「Create Access Token」をクリックするとAPI KEYが表示されるので、それをコピーすれば良い。

f:id:kaname_m:20191226232015p:plain
PushbulletのAPI KEYを取得する(Create Access Tokenをクリック)

API KEYが取得できたら、あとはPythonで以下のようなスクリプトを書けば設定したスマホやPCに通知がすぐに来る。

from pushbullet import Pushbullet

apikey = "さっき取得したAPIKEY"
pb = Pushbullet(apikey)

# テキストをプッシュする
push = pb.push_note("RaspberryPi", "push message")

これを実行すると、下のような通知が来る。簡単である。

f:id:kaname_m:20191226232923j:plain
Pushbulletでテキストをプッシュできた

接続環境によるだろうが、ちょっとしたテキストを送るだけならタイムラグはほとんど感じない(1秒弱)。画像を送る場合でも1000x1000くらいの画像なら3秒程度でアップロードできる。ほぼリアルタイムって感じだ。さすがPushbullet。弾丸のように速いということか。

テキストを送る以外に、下のような感じで画像をプッシュすることも可能だ。これは次あたりの記事で使う。

with open("my_cool_picture.jpg", "rb") as pic:
    file_data = pb.upload_file(pic, "picture.jpg")

push = pb.push_file(**file_data)

インターホンを検知したことを通知する

前回の記事に記載した、ウェブカメラを使ったインターホン音検知のコードに、Pushbulletで通知をプッシュするコードを追記する。 (追記したのはPushbullet関連の数行のみ)

import pyaudio
import numpy as np
import time
from scipy.signal import argrelmax
from pushbullet import Pushbullet

# interphone setting
CHUNK = 1024
RATE = 8000    # sampling rate
dt = 1/RATE
freq = np.linspace(0,1.0/dt,CHUNK)
fn = 1/dt/2;    # nyquist freq
FREQ_HIGH_BASE = 886.0  # high tone frequency
FREQ_LOW_BASE = 726.6   # low tone frequency
FREQ_ERR = 0.02         # allowable freq error
#variable
detect_high = False
detect_low = False

# Pushbullet setting
apikey = "Pushbulletで取得したAPIKEY"
pb = Pushbullet(apikey)

# FFTで振幅最大の周波数を取得する関数
def getMaxFreqFFT(sound, chunk, freq):
    # FFT
    f = np.fft.fft(sound)/(chunk/2)
    f_abs = np.abs(f)
    # ピーク検出
    peak_args = argrelmax(f_abs[:(int)(chunk/2)])
    f_peak = f_abs[peak_args]
    f_peak_argsort = f_peak.argsort()[::-1]
    peak_args_sort = peak_args[0][f_peak_argsort]
    # 最大ピークをreturn
    return freq[peak_args_sort[0]]

# 検知した周波数がインターホンの音の音か判定する関数
def detectDualToneInOctave(freq_in, freq_high_base, freq_low_base, freq_err):
    det_h = det_l = False
    # 検知した周波数が高音・低音のX倍音なのか調べる
    octave_h = freq_in / freq_high_base
    octave_l = freq_in / freq_low_base
    near_oct_h = round(octave_h)
    near_oct_l = round(octave_l)
    if near_oct_h == 0 or near_oct_l == 0:
        return False, False
    # X倍音のXが整数からどれだけ離れているか
    err_h = np.abs((octave_h-near_oct_h) / near_oct_h)
    err_l = np.abs((octave_l-near_oct_l) / near_oct_l)

    # 基音、2倍音、3倍音の付近であればインターホンの音とする
    if err_h < freq_err:
        det_h = True
    elif err_l < freq_err:
        det_l = True

    return det_h, det_l

if __name__=='__main__':
    P = pyaudio.PyAudio()
    stream = P.open(format=pyaudio.paInt16, channels=1, rate=RATE, frames_per_buffer=CHUNK, input=True, output=False)
    
    while stream.is_active():
        try:
            input = stream.read(CHUNK, exception_on_overflow=False)
            ndarray = np.frombuffer(input, dtype='int16')
            abs_array = np.abs(ndarray)/32768
        
            if abs_array.max() > 0.5:
                # FFTで最大振幅の周波数を取得
                freq_max = getMaxFreqFFT(ndarray, CHUNK, freq)
                print("振幅最大の周波数:", freq_max, "Hz")
                h,l = detectDualToneInOctave(freq_max, FREQ_HIGH_BASE, FREQ_LOW_BASE, FREQ_ERR)
                if h:
                    detect_high = True
                    print("高音検知!")
                if l:
                    detect_low = True
                    print("低音検知!")
    
                if detect_high and detect_low:
                    # インターホン音を検知したのでプッシュする
                    push = pb.push_note("RaspberryPi", "インターホンが鳴ったよ")                    
                    time.sleep(30)
                    detect_high = detect_low = False
    
        except KeyboardInterrupt:
            break
        
    stream.stop_stream()
    stream.close()
    P.terminate()

こいつを実行してインターホンの音を聞かせると、

f:id:kaname_m:20191226234127j:plain
インターホンを検知したラズパイから通知が来た

こんな感じで通知が即座に飛んでくる。便利な世界だ。