#!/usr/bin/python3
#
# This Program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either program_versionsion 3, or (at your option)
# any later program_versionsion.
#
# This Program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Usage:
# rotenc_py
# rotenc_py 100 2 3 23 24 1 (print_debug is optional)
#
import RPi.GPIO as GPIO
import threading
import subprocess
import sys
from time import sleep
import sqlite3
import musicpd
program_version = "1.0"
current_pos = 0
last_pos = 0
last_a_state = 1
last_b_state = 1
pin_a = 23
pin_b = 24
poll_interval = 100 # milliseconds
accel_factor = 2
volume_step = 3
print_debug = 0
thread_lock = threading.Lock()
def main():
global poll_interval, accel_factor, volume_step, pin_a, pin_b, print_debug, db, db_cursor, mpd_cli
# Parse input args (if any)
if len(sys.argv) > 1:
if sys.argv[1] == "--version" or sys.argv[1] == "-v":
print("rotenc.py version " + program_version)
sys.exit(0)
if len(sys.argv) >= 6:
poll_interval = int(sys.argv[1])
accel_factor = int(sys.argv[2])
volume_step = int(sys.argv[3])
pin_a = int(sys.argv[4])
pin_b = int(sys.argv[5])
if len(sys.argv) == 7:
print_debug = int(sys.argv[6])
if print_debug:
print(sys.argv, len(sys.argv))
# Setup GPIO
GPIO.setmode(GPIO.BCM) # SoC pin numbering
GPIO.setwarnings(True)
GPIO.setup(pin_a, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(pin_b, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(pin_a, GPIO.BOTH, callback=encoder_isr) # NOTE: bouncetime= is not specified
GPIO.add_event_detect(pin_b, GPIO.BOTH, callback=encoder_isr)
# Setup sqlite
db = sqlite3.connect('/var/local/www/db/moode-sqlite3.db')
db.row_factory = sqlite3.Row
db.text_factory = str
db_cursor = db.cursor()
# Setup MPD client
mpd_cli = musicpd.MPDClient()
#mpd_cli.connect()
# Detect encoder changes
poll_interval = poll_interval / 1000
poll_encoder()
# Interrupt service routine (ISR)
def encoder_isr(pin):
global current_pos, last_a_state, last_b_state, thread_lock
# Read pin states
pin_a_state = GPIO.input(pin_a)
pin_b_state = GPIO.input(pin_b)
# Ignore interrupt if no state change (debounce)
if last_a_state == pin_a_state and last_b_state == pin_b_state:
return
# Store current as last state
last_a_state = pin_a_state
last_b_state = pin_b_state
# Ignore all states except final state where both are 1
# Use pin returned from the ISR to determine which pin came first before reaching 1-1
if pin_a_state and pin_b_state:
thread_lock.acquire()
if pin == pin_a:
current_pos -= 1 # CCW
else:
current_pos += 1 # CW
thread_lock.release()
return
# Polling loop for updating volume
def poll_encoder():
global current_pos, last_pos, thread_lock
direction = ""
while True:
thread_lock.acquire()
if current_pos > last_pos:
direction = "+"
if (current_pos - last_pos) < accel_factor:
update_volume(direction, 1)
else:
update_volume(direction, volume_step)
elif current_pos < last_pos:
direction = "-"
if (last_pos - current_pos) < accel_factor:
update_volume(direction, 1)
else:
update_volume(direction, volume_step)
thread_lock.release()
if (current_pos != last_pos) and print_debug:
print(abs(current_pos - last_pos), direction)
last_pos = current_pos
sleep(poll_interval)
# Update MPD and UI volume
def update_volume(direction, step):
db_cursor.execute("SELECT value FROM cfg_system WHERE param='volknob' OR param='volume_mpd_max'")
row = db_cursor.fetchone()
current_vol = int(row['value'])
row = db_cursor.fetchone()
volume_mpd_max = int(row['value'])
if direction == "+":
new_volume = current_vol + step
else:
new_volume = current_vol - step
if new_volume > volume_mpd_max:
new_volume = volume_mpd_max
if new_volume > 100:
new_volume = 100
elif new_volume < 0:
new_volume = 0
db_cursor.execute("UPDATE cfg_system SET value='" + str(new_volume) + "' WHERE param='volknob'")
db.commit()
mpd_cli.connect()
mpd_cli.setvol(new_volume)
mpd_cli.disconnect()
#
# Script starts here
#
if __name__ == '__main__':
main()