Pythonとバーコードで在庫管理をする

前回、Excelシートを使って(主にPythonで)バーコードによる在庫管理システムを構築しました。詳しくはこちら。が、ぶっちゃけ(いつもですが)Excelの存在意義がほぼないので、純粋にPythonだけを使って在庫管理システムを構築します。

デモ動画

さっそくですが、まずうごきを見てもらうのが早いので、デモ動画をどうぞ。使用機材はこちらのデキるVBAer必携マシーンSurfaceGo3。内蔵メインカメラをバーコードリーダーとして使用しています。

(注)動画のバーコード読み取り時ビープ音が鳴ります。

メインのロジックは以前の記事のネタで使ったものをほぼほぼ踏襲しています。詳細は別記事にて。

システム概要

システムの概要図は次の通りです。

カメラ制御はopencv-python、バーコードデコードはpyzbarで、UIはHTMLで作成しJSで更新、http.serverで配信、バックエンドとUIの通信はWebSocketでやっています。

Pythonスクリプトのソースコードは次の通りです。

import os.path
import sqlite3
import webbrowser
import json
import cv2
import requests
import winsound
import threading
import openpyxl as excel
from time import sleep
from pyzbar.pyzbar import decode
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from websocket_server import WebsocketServer

cd = os.path.dirname(__file__)
DATABASE_PATH = os.path.join(cd, 'stock.db')
TEMPLATE_PATH = os.path.join(cd, 'template-food-stock.html')
EXCEL_BOOK_PATH = os.path.join(cd, 'stock.xlsx')
LINE_API_TOKEN = '*******************************************'


class OrenoServer:

    def __init__(self):
        self.HOST = 'localhost'
        self.HTTP_PORT = 8080
        self.WS_PORT = 8081
        self.WINDOW_NAME = 'JAN CODE READER'
        self.LINE_API_URL = 'https://notify-api.line.me/api/notify'
        self.client = None
        self.wss = WebsocketServer(host=self.HOST, port=self.WS_PORT)
        self.wss.set_fn_new_client(self.new_client)
        self.wss.set_fn_message_received(self.message_received)
        self.https = ThreadingHTTPServer((self.HOST, self.HTTP_PORT), HttpHandler)
        self.db = None
        self.can_scan = True

    def start(self):
        threading.Thread(target=self.wss.run_forever).start()
        threading.Thread(target=self.https.serve_forever).start()
        webbrowser.open(f'http://{self.HOST}:{str(self.HTTP_PORT)}')

    def shutdown(self):
        self.wss.shutdown()
        self.https.shutdown()

    def new_client(self, client, server):
        if self.client is None:
            self.client = client
            threading.Thread(target=self.cam_capture).start()

    def message_received(self, client, server, message):
        m = message.split()
        mode = m[0]
        code = m[1]

        if mode == 'in':
            rc = self.db.set(code, 1)
        else:
            rc = self.db.set(code, -1)
            alert = self.db.check_alert(code)
            if alert is not None:
                self.line_notify(alert['product_name'] + 'がなくなるぞ!')

        if rc == 0:
            winsound.Beep(500, 500)
        else:
            winsound.Beep(2500, 200)

    def cam_capture(self):
        cap = cv2.VideoCapture(0)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        cv2.namedWindow(self.WINDOW_NAME, cv2.WINDOW_AUTOSIZE)
        self.db = OrenoDataBase()

        while cap.isOpened():
            ret, frame = cap.read()

            if ret:
                d = decode(frame)

                if d and self.can_scan:
                    for cnt, barcode in enumerate(d):
                        code = barcode.data.decode('utf-8')

                        if self.is_jan(code):
                            self.can_scan = False

                            if cnt == len(d) - 1:
                                threading.Thread(target=self.scan_wait).start()

                            self.wss.send_message(self.client, code)

            cv2.imshow(self.WINDOW_NAME, frame)
            cv2.waitKey(1)

            if not cv2.getWindowProperty(self.WINDOW_NAME, cv2.WND_PROP_VISIBLE):
                break

        self.db.export_excel()
        self.db.close()
        cap.release()
        self.shutdown()

    def is_jan(self, code):
        return (len(code) == 13 or len(code) == 8) and (code[:2] == '49' or code[:2] == '45')

    def scan_wait(self):
        sleep(2)
        self.can_scan = True

    def line_notify(self, msg):
        param = {'message': msg}
        header = {'Authorization': 'Bearer ' + LINE_API_TOKEN}
        requests.post(self.LINE_API_URL, params=param, headers=header)


class OrenoDataBase:

    def __init__(self):
        self.conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False)
        self.conn.row_factory = sqlite3.Row
        self.cur = self.conn.cursor()

    def get(self):
        self.cur.execute('SELECT * FROM food')
        rows = []

        for r in self.cur.fetchall():
            rows.append({'jan': r['jan'], 'product_name': r['product_name'],
             'stock': r['stock'], 'lower': r['lower']})

        return rows

    def set(self, code, quantity):
        result = self.cur.execute(
        'UPDATE food SET stock = stock + ? WHERE jan = ?', (quantity, code)
        )
        self.conn.commit()
        return result.rowcount

    def check_alert(self, code):
        return self.cur.execute(
            'SELECT product_name FROM food WHERE jan = ? AND stock = lower', (code,)
        ).fetchone()

    def export_excel(self):
        wb = excel.Workbook()
        ws = wb.active
        result = self.cur.execute('SELECT * FROM food')
        header = [d[0] for d in self.cur.description]
        ws.append(header)

        for r in result.fetchall():
            ws.append((r[0], r[1], r[2], r[3]))
            
        wb.save(EXCEL_BOOK_PATH)

    def close(self):
        self.cur.close()
        self.conn.close()


class HttpHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        with open(TEMPLATE_PATH, mode='r', encoding='utf-8') as html:
            response_body = html.read()
            self.send_response(200)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()
            self.wfile.write(response_body.encode('utf-8'))

    def do_POST(self):
        db = OrenoDataBase()
        rows = db.get()
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        response_body = json.dumps(rows)
        self.wfile.write(response_body.encode('utf-8'))
        db.close()


server = OrenoServer()
server.start()

UI用HTMLは次の通りです。

<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    JQuery & Bootstrap CDN読み込み(省略)
    <title>食料庫管理システム</title>
</head>
<body>
<div class="container">
    <div class="mt-5">
        <input type="radio" class="btn-check" name="io-mode" id="out-radio" 
               value="out" checked>
        <label class="btn btn-outline-danger" for="out-radio">出庫(OUT)</label>
        <input type="radio" class="btn-check" name="io-mode" id="in-radio" 
               value="in">
        <label class="btn btn-outline-success" for="in-radio">入庫(IN)</label>
    </div>
    <table class="table mt-5" id="tb">
    </table>
    <div id="log-area" class="border mt-3">
        <p>ログ</p>
    </div>
</div>
<script>
    const HTTP_PORT = 8080
    const WS_PORT = 8081
    let wss = new WebSocket('ws://localhost:' + WS_PORT)

    wss.onmessage = function (e) {
        let target = $('#'+ e.data);
        target.css('background-color','yellow')
        let mode = $('input:radio[name="io-mode"]:checked').val();
        let logMsg = new Date().toLocaleString()

        switch (mode){
            case 'in':
                target.text(Number(target.text()) + 1)
                logMsg += ' ' + target.prev().text() + ' を入庫した。'
                break
            case 'out':
                target.text(Number(target.text()) - 1)
                logMsg += ' ' + target.prev().text() + ' を出庫した。'
        }

        wss.send(mode + ' ' + e.data)
        $('#log-area').append('<p>'+ logMsg + '</p>')

        setTimeout(function (){
            target.css('background-color','transparent')
        },500)
    }

    $.ajax({
        url: 'http://localhost:' + HTTP_PORT,
        type: 'POST',
        dataType: 'json',
    }).then(
        function (data) {
            let elem = '<thead><tr><th>JAN</th><th>商品名</th>' +
                '<th>在庫</th><th>下限</th></tr></thead>'
            elem += '<tbody>'

            $.each(data, function (key, item) {
                elem += '<tr>'
                elem += '<td>' + item.jan + '</td>'
                elem += '<td>' + item.product_name + '</td>'
                elem += '<td id="' + item.jan + '">' + item.stock + '</td>'
                elem += '<td class="text-black-50">' + item.lower + '</td>'
                elem += '<tr>'
            })

            elem += '</tbody>'
            $('#tb').html(elem)
        })
</script>
</body>
</html>

データベースはSQLiteで次のような構成です。

解説

各機能をクラスで分担して制御しています。

class OrenoServer はカメラ、WEBサーバー、WSサーバーを
class OrenoDataBase はデータベースを
class HttpHandler はHTTP通信を
それぞれ担います。

システムを起動するとOrenoServerのイニシャライザでサーバーを初期化します。

class OrenoServer:

    def __init__(self):
        self.HOST = 'localhost'
        self.HTTP_PORT = 8080
        self.WS_PORT = 8081
        self.WINDOW_NAME = 'JAN CODE READER'
        self.LINE_API_URL = 'https://notify-api.line.me/api/notify'
        self.client = None
        self.wss = WebsocketServer(host=self.HOST, port=self.WS_PORT)
        self.wss.set_fn_new_client(self.new_client)
        self.wss.set_fn_message_received(self.message_received)
        self.https = ThreadingHTTPServer((self.HOST, self.HTTP_PORT), HttpHandler)
        self.db = None
        self.can_scan = True

①wss.set_fn_new_clientと②wss.set_fn_message_receivedはWebsocketServerのイベントリスナーを設定するメソッドで、それぞれ①新しいクライアントが接続してきたとき②クライアントからメッセージを受け取ったとき、に実行したい関数を引数で設定します。ここでは①new_clientと②message_receivedメソッドを実行するようにしています。

定数やフラグ系も初期化しておきます。

startメソッドでサーバーをスレッドで立ち上げます。

def start(self):
    threading.Thread(target=self.wss.run_forever).start()
    threading.Thread(target=self.https.serve_forever).start()
    webbrowser.open(f'http://{self.HOST}:{str(self.HTTP_PORT)}')

UI用WEBページをlocalhost:8080で配信開始します。WebSocket通信を8081ポートで待機します。webbrowser.openで通常使用するWEBブラウザを起動しlocalhost:8080へアクセスします。

WEBサーバーの8080ポートにGETでアクセスがあると、class HttpHandlerdo_GETメソッドによりUI用HTMLファイルをレスポンスします。UI用HTMLファイルのJavaScriptによりPOSTでアクセスがあるとdo_POSTメソッドによりclass OrenoDataBaseを使ってデータベースのテーブルをSELECTした結果をJSONでレスポンスします。これをHTMLのテーブルに組み立てて表示します。

同時にイベントリスナーのnew_clientメソッドが呼ばれcam_captureメソッドでカメラ画像をウィンドウに表示します。

HTMLのJSにより8081ポートでWebSocket通信を確立し、サーバーからWebSocketメッセージを受信したときはイベントリスナーwss.onmessageの関数が実行されます。

<script>
    const HTTP_PORT = 8080
    const WS_PORT = 8081
    let wss = new WebSocket('ws://localhost:' + WS_PORT)

    wss.onmessage = function (e) {
        let target = $('#'+ e.data);
        let mode = $('input:radio[name="io-mode"]:checked').val();
        wss.send(mode + ' ' + e.data)

        if(target[0]){
            target.css('background-color','yellow')
            let logMsg = new Date().toLocaleString()

            switch (mode){
                case 'in':
                    target.text(Number(target.text()) + 1)
                    logMsg += ' ' + target.prev().text() + ' を入庫した。'
                    break
                case 'out':
                    target.text(Number(target.text()) - 1)
                    logMsg += ' ' + target.prev().text() + ' を出庫した。'
            }

            $('#log-area').append('<p>'+ logMsg + '</p>')

            setTimeout(function (){
                target.css('background-color','transparent')
            },500)
        }
    }
</script>

この関数ではサーバーからきたメッセージ(JANコード)をたよりに該当する在庫数の要素を特定して選択されている入出庫モードから数量の増減とログ出力をしています。

setTimeoutは在庫数書き換え時に一瞬黄色くみせるためのディレイに使っています。

Python側ではカメラ映像を次のループ処理でリアルタイムでバーコードスキャン&デコードしています。

while cap.isOpened():
    ret, frame = cap.read()

    if ret:
        d = decode(frame)

        if d and self.can_scan:
            for cnt, barcode in enumerate(d):
                code = barcode.data.decode('utf-8')

                if self.is_jan(code):
                    self.can_scan = False

                    if cnt == len(d) - 1:
                        threading.Thread(target=self.scan_wait).start()

                    self.wss.send_message(self.client, code)

    cv2.imshow(self.WINDOW_NAME, frame)
    cv2.waitKey(1)

    if not cv2.getWindowProperty(self.WINDOW_NAME, cv2.WND_PROP_VISIBLE):
        break

今回はJANコード以外は興味ないので、is_janメソッドで超簡単にJANであるか否かの判定をしています。

バーコードが読み取れてかつJANであるならば、scan_waitメソッドで2秒間バーコード読み取りをストップし読み取ったJANをWebSocketでクライアントへ送信します。

クライアントでは受け取ったJANで先に解説した処理を実行したあと、選択されている入出庫モードとJANをサーバーへ返信します。

クライアントから返信されたメッセージによりサーバーのmessage_receivedイベントリスナーが呼ばれ、メッセージの内容からJANと入出庫モードを判定します。

def message_received(self, client, server, message):
    m = message.split()
    mode = m[0]
    code = m[1]

    if mode == 'in':
        rc = self.db.set(code, 1)
    else:
        rc = self.db.set(code, -1)
        alert = self.db.check_alert(code)
        if alert is not None:
            self.line_notify(alert['product_name'] + 'がなくなるぞ!')

    if rc == 0:
        winsound.Beep(500, 500)
    else:
        winsound.Beep(2500, 200)

クライアントからのメッセージをもとにクエリを作成して実行しデータベースの該当するJANの商品の在庫数を増減させます。

もし出庫時に在庫数が下限に等しくなったならばline_notifyメソッドでLINE APIを使ってアラートを出します。

データベースに登録されていない商品のJANコードの場合はOrenoDataBase.setメソッドのreturnが0になります。これにより在庫増減処理が正常にできたかどうかがわかるので、ビープ音の周波数を変えてユーザーへ通知します。

カメラウィンドウを閉じるとcv2.getWindowPropertyの条件がfalseになりループを抜けて次のコードが実行されます。

self.db.export_excel()
self.db.close()
cap.release()
self.shutdown()

OrenoDataBase.export_excelメソッドはデータベースをまるごとExcelファイルにして出力します。

def export_excel(self):
    wb = excel.Workbook()
    ws = wb.active
    result = self.cur.execute('SELECT * FROM food')
    header = [d[0] for d in self.cur.description]
    ws.append(header)

    for r in result.fetchall():
        ws.append((r[0], r[1], r[2], r[3]))

    wb.save(EXCEL_BOOK_PATH)

OpenPyXLでExcelファイルを作成してデータベースからセルへデータ転記しています。これでExcel原理主義勢力下でも安心です。

他でのわかりやすさを優先してcurをrow_factory = sqlite3.Rowで作って列名アクセスを可能にしている関係で、ws.appendにわざわざインデックスでバラしてタプルで渡すという面倒なことをしていますが、row_factoryを設定していない場合はws.append(r)だけでいけます。

DB→Excelの方法を調べていてOpenPyXLのws.appendを今回はじめて知りましたが、めちゃ便利ですねこれ。あとPythonの公式リファレンスは相変わらずのわかりにくさでほとんど参考になりませんね。少しはJavaを見習ってくれと。

Excelで出力できたら、データベースを閉じて、カメラキャプチャをやめて、サーバーをシャットダウンして終了です。

デモで使用したスクリプト一式を配布しますのでお試しください。

使用方法

当然ながらPythonがインストールされていない環境では動作しません。

1.ページ下部のダウンロードボタンからスクリプト他一式をダウンロードします。

2.適当なフォルダへ展開、配置します。

3.必要な外部モジュールをインストールします。

pip install opencv-python
pip install requests
pip install openpyxl
pip install pyzbar
pip install git+https://github.com/Pithikos/python-websocket-server

websocket-serverは↑ではGitがインストールされていないと入りません。先にGitを入れるか面倒ならバージョンが古いのでトラブる可能性を承知で

pip install websocket-server

でインストールしてください。

pyzbarはVisualC++ランタイムが必要な場合があります。実行してエラーが出たらメッセージでググってインストールしてください。

4.データベース stock.dbを在庫管理したい商品の情報に書き換えます。
SQLiteのインストールは必須ではありません。DB Browser for SQLite等で編集できます。

5.jan_code_reader_websocket.pyを実行します。

カメラが複数ある場合はjan_code_reader_websocket.pyの71行目
cap = cv2.VideoCapture(0) の数値を
cap = cv2.VideoCapture(1)などにすると、使用するカメラを変更できます。

カメラの解像度は72、73行目で変更できます。カメラにより設定できる解像度は変わります。高解像度の方が処理は重くなりますが認識精度は上がる気がします。

LINE APIによるアラートは無効にしてあります。有効にするには、ご自身で取得したトークンを19行目のLINE_API_TOKENへ入力して61~63行目のコメントアウトを外してください。

エラーハンドラーは一切ありませんので、必要に応じて追加してください。

アプリを入手する

利用上のご注意

  • ダウンロードしたファイルを利用したことにより生じた結果については、利用者ご自身に責任を負っていただきます。
  • ご利用前に使用方法をご確認ください。
  • 当方は成果物の正確性について最善を尽くしますが保証はいたしません。
  • Windows11 Microsoft365 環境でのみ動作確認済み。

Downloadボタンを押下した時点で注意事項に同意したものとみなします。

bc_stock_manager.zip

おわり。