#!/usr/bin/python3 import dbus import dbus.service import dbus.exceptions import getpass import gi import grp import jack import json import os import re import resource import shlex import shutil import signal import socket import subprocess import sys import time from os.path import expanduser gi.require_version('Gtk', '3.0') from gi.repository import Gtk, GLib from dbus.mainloop.glib import DBusGMainLoop # the auto_jack module might be in local in some cases global install_path install_path = os.path.abspath(f"{sys.path[0]}/..") sys.path.insert(1, f"{install_path}/lib/python3/dist-packages") import auto_jack DBusGMainLoop(set_as_default=True) class sendbus(dbus.service.Object): def __init__(self): dbus.service.Object.__init__(self, dbus.SessionBus(), "/") @dbus.service.signal(dbus_interface="org.studio.control.command", signature="s") def signal(self, ping): print(f"sent it {ping}") pass class SysInfo: """Get information about the system""" # get info about if rtaccess is setup right def user_audio(self): """Checks if current user is in the audio group, or not""" audio_users = [] audio_users = grp.getgrnam("audio")[3] user = getpass.getuser() if user in audio_users: return True return False def check_pam_files(self): '''Checks for the existence of files''' if os.path.isfile("/etc/security/limits.d/audio.conf"): return True return False def check_rlimits(self): '''returns hard rlimit values for RTPRIO and MEMLOCK''' return {resource.getrlimit(resource.RLIMIT_RTPRIO)[1], resource.getrlimit(resource.RLIMIT_MEMLOCK)[1]} class StudioControls: global lock_file config_path = auto_jack.config_path def __init__(self): '''Activate the SysInfo class''' # this is a long chunk of code that initializes every thing # it should probably be split into tabs at least global lock_file global install_path global version auto_jack.check_user() # don't run as system or root print(f"install path: {install_path}") self.sysinfo = SysInfo() c_dir = expanduser(auto_jack.config_path) if not os.path.isdir(c_dir): os.makedirs(c_dir) lock_file = expanduser(f"{self.config_path}/studio-controls.lock") new_pid = str(os.getpid()) if os.path.isfile(lock_file): old_pid = new_pid with open(lock_file, "r") as lk_file: for line in lk_file: # only need one line old_pid = line.rstrip() if new_pid != old_pid: try: os.kill(int(old_pid), 9) except Exception: pass time.sleep(1) with open(lock_file, "w") as lk_file: lk_file.write(new_pid) version = auto_jack.version() signal.signal(signal.SIGHUP, self.sig_handler) signal.signal(signal.SIGINT, self.sig_handler) signal.signal(signal.SIGQUIT, self.sig_handler) signal.signal(signal.SIGILL, self.sig_handler) signal.signal(signal.SIGTRAP, self.sig_handler) signal.signal(signal.SIGABRT, self.sig_handler) signal.signal(signal.SIGBUS, self.sig_handler) signal.signal(signal.SIGFPE, self.sig_handler) # signal.signal(signal.SIGKILL, self.sig_handler) signal.signal(signal.SIGUSR1, self.sig_handler) signal.signal(signal.SIGSEGV, self.sig_handler) signal.signal(signal.SIGUSR2, self.sig_handler) signal.signal(signal.SIGPIPE, self.sig_handler) signal.signal(signal.SIGALRM, self.sig_handler) signal.signal(signal.SIGTERM, self.sig_handler) # Create the GUI builder = Gtk.Builder() builder.add_from_file( f"{install_path}/share/studio-controls/studio-controls.glade") # Get windows self.window_main = builder.get_object('window_main') self.window_help = builder.get_object('window_help') # Which of these can we get rid of '''self.message_dialog_changes_info = builder.get_object( 'message_dialog_changes_info') self.message_dialog_rt_info = builder.get_object( 'message_dialog_rt_info') self.message_dialog_changes_info.set_transient_for(self.window_main) self.message_dialog_rt_info.set_transient_for(self.window_main)''' self.title_label = builder.get_object('label_main_top') self.button_msg_ok = builder.get_object('button_msg_ok') self.title_label.set_text( f"Studio Set Up Utility (version: {version})") # Get buttons for system tab self.rt_button = builder.get_object('rt_button') self.rt_warning = builder.get_object('rt_warning') self.combo_governor = builder.get_object('combo_governor') self.combo_boost = builder.get_object('combo_boost') self.logging_comb = builder.get_object('logging_comb') self.combo_fw = builder.get_object('combo_fw') self.sub_system_cmb = builder.get_object('sub_system_cmb') # right side menu self.mixer_start = builder.get_object('mixer_start') # master tab self.jack_device_combo = builder.get_object('jack_device_combo') self.jack_usb_dev_combo = builder.get_object('jack_usb_dev_combo') self.chan_in_spin = builder.get_object('chan_in_spin') self.chan_out_spin = builder.get_object('chan_out_spin') self.jack_rate_combo = builder.get_object('jack_rate_combo') self.combobox_late = builder.get_object('combobox_late') self.combo_periods = builder.get_object('combo_periods') self.combo_backend = builder.get_object('combo_backend') self.monitor_combo = builder.get_object('monitor_combo') self.cap_lat_spin = builder.get_object('cap_lat_spin') self.play_lat_spin = builder.get_object('play_lat_spin') self.jack_midi_check = builder.get_object('jack_midi_check') self.jack_midi_u = builder.get_object('jack_midi_u') self.jack_ind = builder.get_object('jack_ind') self.jack_state = builder.get_object('jack_state') self.dsp_label = builder.get_object('dsp_label') self.xrun_lab = builder.get_object('xrun_lab') # Pipewire Tab self.pw_master_dr = builder.get_object('pw_master_dr') self.pw_secondary_dr = builder.get_object('pw_secondary_dr') self.pw_jkdev_dr = builder.get_object('pw_jkdev_dr') self.pw_rate_dr = builder.get_object('pw_rate_dr') self.pw_buffer_dr = builder.get_object('pw_buffer_dr') self.pw_latency_dr = builder.get_object('pw_latency_dr') # Device tab self.usb_plug_check = builder.get_object('usb_plug_check') self.xdev_select = builder.get_object('xdev_select') self.cap_chan_spin = builder.get_object('cap_chan_spin') self.play_chan_spin = builder.get_object('play_chan_spin') self.hide_check = builder.get_object('hide_check') self.xdev_name = builder.get_object('xdev_name') self.xdev_description = builder.get_object('xdev_description') self.xdev_profile = builder.get_object('xdev_profile') self.xdev_rate_drop = builder.get_object('xdev_rate_drop') self.xdev_buff_drop = builder.get_object('xdev_buff_drop') self.xdev_nperiods_drop = builder.get_object('xdev_nperiods_drop') self.xdev_cap_lat = builder.get_object('xdev_cap_lat') self.xdev_play_lat = builder.get_object('xdev_play_lat') # phones self.hp_action = builder.get_object('hp_action') self.hp_device = builder.get_object('hp_device') self.hp_left_pt = builder.get_object('hp_left_pt') # self.hp_switch = builder.get_object('hp_switch') # pulse tab self.pj_combo = builder.get_object('pj_combo') self.pj_direction = builder.get_object('pj_direction') self.pj_name = builder.get_object('pj_name') self.pj_count = builder.get_object('pj_count') self.pj_con = builder.get_object('pj_con') # Session Manager self.jk_connect_mode = builder.get_object('jk_connect_mode') # network tab # audio self.znet_bridge = builder.get_object('znet_bridge') self.znet_direction = builder.get_object('znet_direction') self.znet_count = builder.get_object('znet_count') self.znet_ip = builder.get_object('znet_ip') self.znet_port = builder.get_object('znet_port') self.znet_name = builder.get_object('znet_name') self.znet_bits = builder.get_object('znet_bits') self.znet_late = builder.get_object('znet_late') self.znet_warn = builder.get_object('znet_warn') # midi self.mnet_warn = builder.get_object('mnet_warn') self.mnet_count = builder.get_object('mnet_count') self.mnet_type = builder.get_object('mnet_type') # Dbus monitoring user_bus = dbus.SessionBus() user_bus.add_signal_receiver( self.db_state_cb, dbus_interface='org.studio.control.state', signal_name='state') user_bus.add_signal_receiver( self.new_jack_port, dbus_interface='org.jackaudio.JackPatchbay', signal_name='PortAppeared') user_bus.add_signal_receiver( self.new_jack_port, dbus_interface='org.jackaudio.JackPatchbay', signal_name='PortDisappeared') self.sendbs = sendbus() # Set default window icon for window managers self.window_main.set_default_icon_name( 'com.github.ovenwerks.studio-controls') # Check if audio.conf and/or audio.conf.disabled # exists, returns are true or false # self.rt_file = False self.jack_file_exists = self.sysinfo.check_pam_files() if self.jack_file_exists and self.sysinfo.user_audio(): rtprio, memlock = self.sysinfo.check_rlimits() if rtprio == 0: self.rt_button.set_label("Reboot required") self.rt_button.set_sensitive(False) self.warnshow("It seems you have made settings previously" " and not done a fresh login, or you already have" " realtime settings in effect, but you are not using" " the file supplied with the package jackd. If you" " want to use this application for administering" " realtime settings, please remove any custom" " settings and reboot your computer.") self.rt_warning.set_text( "Session restart required for Real Time Permissions") else: # turn off warning text, check on, deactivate self.rt_warning.set_text("") self.rt_button.set_label("Real Time Permissions Enabled") self.rt_button.set_sensitive(False) # read in autojack config file self.conf_db = auto_jack.convert() self.jackdb = self.conf_db['jack'] self.extra = self.conf_db['extra'] # show current CPU Governor self.combo_governor.append_text("Performance") if os.path.exists("/sys/devices/system/cpu/intel_pstate/"): self.combo_governor.append_text("Powersave") else: self.combo_governor.append_text("Ondemand") if self.conf_db['cpu-governor']: self.combo_governor.set_active(0) else: self.combo_governor.set_active(1) # show boost state if os.path.exists("/sys/devices/system/cpu/intel_pstate/no_turbo"): if self.conf_db['boost']: self.combo_boost.set_active(0) else: self.combo_boost.set_active(1) else: self.combo_boost.set_sensitive(False) if os.path.exists("/etc/modprobe.d/blacklist-studio.conf"): self.combo_fw.set_active_id("ffado") self.combo_backend.append("firewire", "firewire") # Audio stuff global autojack global newusb global jack_alive global jack_ports_changed global jackstate global jackstring jackstring = "unknown" jackstate = False autojack = True newusb = False jack_alive = False jack_ports_changed = True self.jack_error_mesg = "" self.jack_info_mesg = "" self.dirty = False self.not_applied = False jack.set_error_function(callback=self.jack_error) jack.set_info_function(callback=self.jack_info) self.logging_comb.set_active_id(str(self.conf_db['log-level'])) # fill in Jack master widgets self.combo_periods.set_sensitive(False) self.combo_backend.set_sensitive(False) self.chan_in_spin.set_sensitive(False) self.chan_out_spin.set_sensitive(False) self.cap_lat_spin.set_sensitive(False) self.play_lat_spin.set_sensitive(False) self.jack_midi_check.set_sensitive(False) self.jack_midi_u.set_sensitive(False) self.usb_plug_check.set_sensitive(False) self.jk_connect_mode.set_sensitive(False) self.combo_periods.set_active_id(str(self.jackdb['period'])) self.combo_backend.set_active_id(self.jackdb['driver']) self.chan_in_spin.set_range(1, 128) self.chan_out_spin.set_range(1, 128) self.chan_in_spin.set_value(self.jackdb['chan-in']) self.chan_out_spin.set_value(self.jackdb['chan-out']) self.cap_lat_spin.set_range(0, 1000) self.play_lat_spin.set_range(0, 1000) self.cap_lat_spin.set_value(self.jackdb['cap-latency']) self.play_lat_spin.set_value(self.jackdb['play-latency']) self.jack_midi_check.set_active(self.extra['a2j']) if "a2j_u" not in self.extra: self.extra['a2j_u'] = False self.jack_midi_u.set_active(self.extra['a2j_u']) # Fill Extra devices widgets self.usb_plug_check.set_active(self.extra['usbauto']) if 'phone-left' not in self.extra: self.extra['phone-left'] = '1' self.hp_left_pt.set_value(int(self.extra['phone-left'])) self.pw_setup() # pulse bridge defaults self.pj_direction.set_sensitive(False) self.pj_count.set_range(1, 99) # Session Manger settings self.jk_connect_mode.set_active_id(self.jackdb['connect-mode']) # net settings self.znet_direction.set_sensitive(False) self.combo_periods.set_sensitive(True) self.combo_backend.set_sensitive(True) self.chan_in_spin.set_sensitive(True) self.chan_out_spin.set_sensitive(True) self.cap_lat_spin.set_sensitive(True) self.play_lat_spin.set_sensitive(True) self.jack_midi_check.set_sensitive(True) self.jack_midi_u.set_sensitive(True) self.usb_plug_check.set_sensitive(True) self.jk_connect_mode.set_sensitive(True) self.refresh_dropdowns() self.pj_bridge = "" self.refresh_pulse_tab(self.pj_bridge) self.znetbridge = "" self.refresh_net(self.znetbridge) handlers = { "on_window_main_delete_event": self.on_window_main_delete_event, "on_window_help_delete_event": self.on_window_help_delete_event, "on_main_button_cancel_clicked": self.on_main_button_cancel_clicked, "on_main_button_help_clicked": self.on_main_button_help_clicked, "combo_governor_changed_cb": self.combo_governor_changed_cb, "combo_boost_changed_cb": self.combo_boost_changed_cb, "logging_change": self.logging_changed, "firewire_cb": self.firewire_cb, "sub_system_cb": self.sub_system_cb, "rt_button_hit": self.rt_button_hit, "on_button_msg_ok_clicked": self.on_button_msg_ok_clicked, "on_button_rt_info_ok_clicked": self.on_button_rt_info_ok_clicked, "on_button_help_ok_clicked": self.on_button_help_ok_clicked, "jack_device_changed": self.jack_device_changed, "usb_master_changed": self.usb_master_changed, "jack_driver_changed": self.jack_driver_changed, "xrun_reset": self.xrun_reset, "cb_jack_start": self.cb_jack_start, "cb_jack_stop": self.cb_jack_stop, "cb_audio_apply": self.cb_audio_apply, "mixer_cb": self.mixer_cb, "pavucontrol_cb": self.pavucontrol_cb, "carla_cb": self.carla_cb, "generic_cb": self.generic_cb, "pw_master_cb": self.pw_master_cb, "pw_secondary_cb": self.pw_secondary_cb, "pw_jkdev_cb": self.pw_jkdev_cb, "pw_rate_cb": self.pw_rate_cb, "pw_buffer_cb": self.pw_buffer_cb, "pw_latency_cb": self.pw_latency_cb, "xdev_select_cb": self.xdev_select_cb, "xdev_changed": self.xdev_changed, "xdev_cap_all_cb": self.xdev_cap_all_cb, "xdev_play_all_cb": self.xdev_play_all_cb, "usb_plug_cb": self.usb_plug_cb, "hp_action_cb": self.hp_action_cb, "hp_switch_cb": self.hp_switch_cb, "switchtomon_cb": self.switchtomon_cb, "pj_combo_cb": self.pj_combo_cb, "pj_add_cb": self.pj_add_cb, "pj_rem_cb": self.pj_rem_cb, "pj_name_cb": self.pj_name_cb, "znet_bridge_cb": self.znet_bridge_cb, "znet_changed_cb": self.znet_changed_cb, "znet_add_cb": self.znet_add_cb, "znet_rem_cb": self.znet_rem_cb, "mnet_count_cb": self.mnet_count_cb, "mnet_type_cb": self.mnet_type_cb, "ray_cb": self.ray_cb, "nsm_cb": self.nsm_cb, "agordejo_cb": self.agordejo_cb, } builder.connect_signals(handlers) self.timeout_id = GLib.timeout_add(500, self.check_jack_status, None) self.signal_autojack("ping") autojack = False print("initialization complete") def jack_error(self, mesg): if self.jackdb['on']: if "not running" in mesg: print(f"jack message received: {mesg}") def generic_cb(self, widget): ''' changes have been made but not yet applied ''' if not widget.get_sensitive(): return self.not_applied = True def jack_info(self, mesg): print(f"jack_info received: {mesg}") def db_state_cb(*args, **kwargs): ''' received signal from autojack ''' global version global autojack global jackstate global jackstring global newusb if version == args[1]: if not autojack: autojack = True print("autojack is running") else: print( f"Autojack version is {args[1]} will be" "restarted on any command") if args[2] == 'usb': newusb = True print("autojack sees usb change") else: jackstring = args[2] jackstate = True print(f"version: {args[1]} status:{args[2]}") def check_jack_status(self, user_data): '''Check if jack has died and the client needs to be closed. Check if jack is running then set jack status indicator. Check to see if the device lists have changed and update the gui if so. Updating GUI prevents race with secondary updates caused by updating''' # these variables need to be global as they are used by callbacks global newusb global jack_client global jack_alive global jack_ports_changed global jackstate global jackstring bus = dbus.SessionBus() controller = bus.get_object( "org.jackaudio.service", "/org/jackaudio/Controller") control_iface = dbus.Interface( controller, "org.jackaudio.JackControl") jack_check = False if jackstring[0:4] == "Stop": jack_check = False else: jack_check = True if jackstate: # This signal comes from autojack self.jack_state.set_text(f" {jackstring}") jackstate = False if int(control_iface.IsStarted()) and jack_check: xrun_count = control_iface.GetXruns() self.xrun_lab.set_text(f" {str(xrun_count)}") load = control_iface.GetLoad() self.dsp_label.set_text(f"DSP: {str(load)[0:4]}%") if not jack_alive: try: jack_client = jack.Client( 'controls', use_exact_name=False, no_start_server=True) except jack.JackError: self.xrun_lab.set_text(" 0") return True jack_client.activate() jack_alive = True jack_ports_changed = True self.refresh_pulse_tab(self.pj_bridge) else: if jack_alive: # Jack has just stopped jack_client.deactivate() jack_client.close() jack_check = True jack_alive = False self.dirty = True self.refresh_pulse_io() self.dsp_label.set_text("DSP: 0%") self.xrun_lab.set_text(" 0") # device changed, update GUI if self.dirty or newusb: self.refresh_dropdowns() self.dirty = False newusb = False if jack_ports_changed: jack_ports_changed = False self.refresh_pulse_io() return True def new_jack_port(*args, **kwargs): ''' jack has added a port tell someone ''' global jack_ports_changed jack_ports_changed = True def xrun_reset(self, button): ''' user asked for an xrun reset so do it ''' bus = dbus.SessionBus() controller = bus.get_object( "org.jackaudio.service", "/org/jackaudio/Controller") control_iface = dbus.Interface(controller, "org.jackaudio.JackControl") control_iface.ResetXruns() def refresh_dropdowns(self): '''this call refreshes the device lists for all drop downs that use devices. If backend is not "alsa" then the jack master and USB master are set but not changable However, all alsa devices will still be available for bridging and as output device. ''' temp_db = auto_jack.check_devices(self.conf_db) # refresh devices for temp_dev in temp_db['devices']: # transfer current device info into db without changing # settings that may not yet be saved dev_db = self.conf_db['devices'][temp_dev] check_db = temp_db['devices'][temp_dev] if temp_dev in self.conf_db['devices']: dev_db['number'] = check_db['number'] dev_db['id'] = check_db['id'] dev_db['bus'] = check_db['bus'] else: dev_db = temp_db['devices'][temp_dev] xdev_current = self.xdev_select.get_active_id() self.current_ex_dev = xdev_current fw_mixer = False # driver is probably ok to leave above self.combo_backend.set_active_id(self.jackdb['driver']) self.cap_lat_spin.set_value(self.jackdb['cap-latency']) self.play_lat_spin.set_value(self.jackdb['play-latency']) self.jack_device_combo.set_sensitive(False) self.jack_usb_dev_combo.set_sensitive(False) self.chan_in_spin.set_sensitive(False) self.chan_out_spin.set_sensitive(False) self.combobox_late.set_sensitive(False) self.combo_periods.set_sensitive(False) self.jack_rate_combo.set_sensitive(False) self.xdev_select.set_sensitive(False) self.mixer_start.set_sensitive(False) # popdown any combo boxes before changing self.jack_device_combo.popdown() self.xdev_select.popdown() self.jack_usb_dev_combo.popdown() self.jack_rate_combo.popdown() self.combobox_late.popdown() self.mixer_start.popdown() self.jack_device_combo.get_model().clear() self.xdev_select.get_model().clear() self.jack_usb_dev_combo.get_model().clear() self.jack_rate_combo.get_model().clear() self.combobox_late.get_model().clear() self.mixer_start.get_model().clear() self.mixer_start.append("none", "Open Device Mixer") self.mixer_start.set_active_id("none") if self.combo_fw.get_active_id() == "ffado": self.mixer_start.append("ffado", "FFADO Device Mixer") fw_mixer = True rates = [] def_rates = ['32000', '44100', '48000', '88200', '96000', '192000'] frames = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096] self.jack_usb_dev_combo.append("none", "No USB Master") if self.jackdb['usbdev'] == "" or self.jackdb['usbdev'] == "none": self.jack_usb_dev_combo.set_active_id("none") self.hp_device.popdown() self.hp_device.get_model().clear() self.hp_device.append( self.extra['phone-device'], self.extra['phone-device']) self.hp_device.set_active_id(self.extra['phone-device']) if self.extra['phone-device'] != 'system': self.hp_device.append( 'system', 'JACK Master (system:*)') for this_dev in self.conf_db['devices']: dev_db = self.conf_db['devices'][this_dev] if 'firewire' in dev_db and dev_db['firewire'] and not fw_mixer: self.mixer_start.append("ffado", "FFADO Device Mixer") fw_mixer = True if dev_db['number'] != -1: # we only want plugged devices self.mixer_start.append(dev_db['raw'], dev_db['raw']) for this_sub in dev_db['sub']: sub_db = dev_db['sub'][this_sub] d_type = "" if sub_db['capture']: d_type = "capture" if sub_db['playback']: if d_type == "": d_type = "playback" else: d_type = f"{d_type} and playback" next_id = "" dname = "" next_d = "" next_id = f"{this_dev},{this_sub},0" if dev_db['usb']: dname = f"({dev_db['raw']})" if dev_db['number'] == -1: dname = "(unplugged)" next_d = f"{next_id} {dname} {d_type} {sub_db['description']}" if "Loopback" in [this_dev]: self.xdev_select.append(next_id, next_d) else: self.xdev_select.insert(0, next_id, next_d) if not sub_db['hide']: # not hidden so we can add it if "Loopback" not in [this_dev]: self.hp_device.append(next_id, next_d) if dev_db['usb']: self.jack_usb_dev_combo.append(next_id, next_d) else: self.jack_device_combo.append(next_id, next_d) if next_id == self.extra['phone-device']: self.hp_device.set_active_id(next_id) if next_id == self.jackdb['dev']: self.jack_device_combo.set_active_id(next_id) if self.jackdb['usbdev'] == "none": # this is jack master get rates rates = dev_db['rates'] if next_id == self.jackdb['usbdev']: self.jack_usb_dev_combo.set_active_id(next_id) # this is jack master get rates rates = dev_db['rates'] if self.jackdb['driver'] != "alsa" or rates == []: rates = def_rates for rate in rates: self.jack_rate_combo.append(rate, rate) if str(self.jackdb['rate']) in rates: self.jack_rate_combo.set_active_id(str(self.jackdb['rate'])) else: self.jack_rate_combo.set_active(0) minlat = 16 fw = False if self.jackdb['driver'] == "alsa": if self.jackdb['usbdev'] != 'none': minlat = self.conf_db[ 'devices'][ self.jackdb['usbdev'].split(',')[0]][ 'min_latency'] else: minlat = self.conf_db['devices'][self.jackdb['dev'].split(',')[ 0]]['min_latency'] fw = self.conf_db['devices'][self.jackdb['dev'].split(',')[ 0]]['firewire'] for frame in frames: tplt = "" if frame == minlat and fw: tplt = " For lower latency Use the FFADO kernal modules." if frame >= minlat: self.combobox_late.append(str(frame), f"{str(frame)}{tplt}") if self.jackdb['frame'] in frames and self.jackdb['frame'] >= minlat: self.combobox_late.set_active_id(str(self.jackdb['frame'])) else: self.combobox_late.set_active_id(str(minlat)) self.mixer_start.set_sensitive(True) self.jack_rate_combo.set_sensitive(True) self.combobox_late.set_sensitive(True) if self.jackdb['driver'] == "alsa": self.jack_device_combo.set_sensitive(True) self.jack_usb_dev_combo.set_sensitive(True) self.combo_periods.set_sensitive(True) elif self.jackdb['driver'] == "firewire": self.combo_periods.set_sensitive(True) elif self.jackdb['driver'] == "dummy": self.chan_in_spin.set_sensitive(True) self.chan_out_spin.set_sensitive(True) self.redraw_extra(xdev_current) '''Functions for all the gui controls''' def on_window_help_delete_event(self, window, event): self.window_help.hide_on_delete() return True def on_main_button_help_clicked(self, button): # TODO move this to: self.infoshow(title, text) # TODO make this notebook tab specific self.window_help.show() def rt_button_hit(self, button): pke = shutil.which("pkexec") if pke is None: print("no pkexec, Fail") return subprocess.run( [pke, f"{install_path}/sbin/studio-system", "fix"], shell=False) self.rt_button.set_label("Logout required") self.rt_button.set_sensitive(False) self.warnshow("For changes to take effect, you need to do a reboot.") self.rt_warning.set_text( "Session restart required for Real Time Permissions") # system tweaks def combo_governor_changed_cb(self, button): newval = False if button.get_active_text() == "Performance": newval = True if self.conf_db['cpu-governor'] != newval: self.conf_db['cpu-governor'] = newval self.cb_audio_apply(button) def combo_boost_changed_cb(self, button): newval = False if button.get_active_text() == "off": newval = True if self.conf_db['boost'] != newval: self.conf_db['boost'] = newval self.cb_audio_apply(button) def logging_changed(self, widget): newval = widget.get_active_id() if self.conf_db['log-level'] != newval: self.conf_db['log-level'] = newval self.cb_audio_apply(widget) def firewire_cb(self, widget): newval = widget.get_active_id() if newval == "alsa" or newval == "ffado": pke = shutil.which("pkexec") if pke is None: print("no pkexec, Fail") return subprocess.run( [pke, f"{install_path}/sbin/studio-system", newval], shell=False) self.warnshow(f"Reboot to complete change to {newval}.") def sub_system_cb(self, widget): ''' sends messages to change the audio subsystem. there are three choices: - jack - pa for desktop with jackd(bus) for proaudio - pipewire - pipewire does desktop and acts like jack Hopefully, this mode will allow using jack as a device - pw-jack - pw does desktop, jackd(bus) runs as well Bridging is still a question. ''' newval = widget.get_active_id() oldval = self.conf_db['sub-system'] if newval != oldval: pls = shutil.which("pulseaudio") in [None] jck = shutil.which("jackdbus") in [None] pw = shutil.which("pipewire") in [None] if newval == 'jack': if pls or jck: self.warnshow(f"pulseaudio or jackd2 not installed" f"staying in {oldval} mode.") return if newval == 'pipewire': if pw: self.warnshow(f"pipewire not installed" f"staying in {oldval} mode.") return if newval == 'pw-jack': if pw or jck: self.warnshow(f"pipewire or jackd2 not installed" f"staying in {oldval} mode.") return pke = shutil.which("pkexec") if pke is None: print("no pkexec, Fail cannot change sub-system") return self.conf_db['sub-system'] = newval # send signal to autojack so it can shutdown things if needed # self.cb_audio_apply(widget) cmd1 = "" cmd2 = "" cmd3 = "" if newval == "jack": # needboot = True # Jack mode needs pulse cmd1 = "systemctl --user stop wireplumber.service" cmd1 = f"{cmd1} pipewire-pulse.service pipewire.service" cmd1 = f"{cmd1} pipewire.socket" cmd2 = "systemctl --user unmask pulseaudio" cmd3 = "systemctl --user --now enable pulseaudio.socket" cmd3 = f"{cmd3} pulseaudio.service" elif oldval == "jack": # no longer jack mode so no pulse cmd1 = "systemctl --user --now disable pulseaudio.service" cmd1 = f"{cmd1} pulseaudio.socket" cmd2 = "systemctl --user mask pulseaudio" cmd3 = "systemctl --user restart pipewire.socket" cmd3 = f"{cmd3} wireplumber.service pipewire-pulse.service" cmd3 = f"{cmd3} pipewire.service" if cmd1 != "": subprocess.run(shlex.split(cmd1), shell=False) time.sleep(1) subprocess.run(shlex.split(cmd2), shell=False) time.sleep(1) subprocess.run(shlex.split(cmd3), shell=False) if newval == "pipewire": # point to PW's libjack subprocess.run( [pke, f"{install_path}/sbin/studio-system", "pwlib"], shell=False) if oldval == "pipewire": # point to real libjack subprocess.run( [pke, f"{install_path}/sbin/studio-system", "jacklib"], shell=False) self.cb_audio_apply(widget) # Audio setup call backs # PipeWire setup def pw_setup(self): # Make sure no changes happen till we are done self.pw_master_dr.set_sensitive(False) self.pw_secondary_dr.set_sensitive(False) self.pw_jkdev_dr.set_sensitive(False) self.pw_rate_dr.set_sensitive(False) self.pw_buffer_dr.set_sensitive(False) self.pw_latency_dr.set_sensitive(False) # popdown any combo boxes before changing self.pw_master_dr.popdown() self.pw_secondary_dr.popdown() self.pw_jkdev_dr.popdown() self.pw_rate_dr.popdown() self.pw_buffer_dr.popdown() self.pw_latency_dr.popdown() # clear what needs clearing self.pw_master_dr.get_model().clear() self.pw_secondary_dr.get_model().clear() self.pw_rate_dr.get_model().clear() self.pw_buffer_dr.get_model().clear() self.pw_latency_dr.get_model().clear() self.pw_jkdev_dr.set_active_id('false') self.pw_secondary_dr.append("none", "none") self.pw_secondary_dr.set_active_id("none") for device in self.conf_db['devices']: this_dev = self.conf_db['devices'][device] for subd in this_dev['sub']: sub_dev = this_dev['sub'][subd] if not sub_dev['hide']: bustxt = "" plugged = "" if this_dev['usb']: bustxt = f" ({this_dev['bus']})" if this_dev['number'] < 0: plugged = " (unplugged)" this_entry = f"{sub_dev['name']}{bustxt}" this_entry = f"{this_entry} {sub_dev['description']}" this_entry = f"{this_entry}{plugged}" self.pw_master_dr.append(sub_dev['name'], this_entry) self.pw_secondary_dr.append(sub_dev['name'], this_entry) if self.conf_db['pipewire']['primary'] == sub_dev['name']: self.pw_master_dr.set_active_id(sub_dev['name']) for rate in this_dev['rates']: self.pw_rate_dr.append(rate, rate) if int(rate) == self.conf_db['pipewire']['rate']: self.pw_rate_dr.set_active_id(rate) frames = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096] for buff in frames: if buff >= this_dev['min_latency']: tbuff = str(buff) self.pw_buffer_dr.append(tbuff, tbuff) if buff == self.conf_db['pipewire']['buffer']: self.pw_buffer_dr.set_active_id(tbuff) periods = [2, 3] tmppd = self.conf_db['pipewire']['latency'] for pds in periods: p_id = pds * self.conf_db['pipewire']['buffer'] p_text = str(1000 * p_id / self.conf_db['pipewire']['rate']) self.pw_latency_dr.append(str(pds), f"{p_text:.3} ms") if pds == tmppd: self.conf_db['pipewire']['latency'] = tmppd self.pw_latency_dr.set_active_id(str(pds)) if self.pw_latency_dr.get_active == -1: self.pw_latency_dr.set_active_id('2') if self.conf_db['pipewire']['secondary'] == sub_dev['name']: self.pw_secondary_dr.set_active_id(sub_dev['name']) self.pw_master_dr.set_sensitive(True) self.pw_secondary_dr.set_sensitive(True) # self.pw_jkdev_dr.set_sensitive(True) TODO not implemented yet self.pw_rate_dr.set_sensitive(True) self.pw_buffer_dr.set_sensitive(True) self.pw_latency_dr.set_sensitive(True) # PipeWire call backs def pw_master_cb(self, widget): device = str(widget.get_active_id()) if device == 'None': return if device != self.conf_db['pipewire']['primary']: self.conf_db['pipewire']['primary'] = device self.pw_setup() self.not_applied = True def pw_secondary_cb(self, widget): device = str(widget.get_active_id()) if device == 'None': return if device != self.conf_db['pipewire']['secondary']: self.conf_db['pipewire']['secondary'] = device self.not_applied = True def pw_jkdev_cb(self, widget): rawjkdev = str(widget.get_active_id()) if rawjkdev != "None": kcdev = rawjkdev in ['True', 'true'] if jkdev != self.conf_db['pipewire']['usejack']: self.conf_db['pipewire']['usejack'] = jkdev self.not_applied = True def pw_rate_cb(self, widget): rate = str(widget.get_active_id()) if rate == 'None': return if int(rate) != self.conf_db['pipewire']['rate']: self.conf_db['pipewire']['rate'] = int(rate) self.pw_setup() self.not_applied = True def pw_buffer_cb(self, widget): buff = str(widget.get_active_id()) if buff == 'None': return if int(buff) != self.conf_db['pipewire']['buffer']: self.conf_db['pipewire']['buffer'] = int(buff) self.pw_setup() self.not_applied = True def pw_latency_cb(self, widget): late = str(widget.get_active_id()) if late == 'None': return if int(late) != self.conf_db['pipewire']['latency']: self.conf_db['pipewire']['latency'] = int(late) self.pw_setup() self.not_applied = True # Device setup names, profiles, etc. Call backs def xdev_select_cb(self, widget): a_id = str(widget.get_active_id()) if a_id != "None" and a_id != self.current_ex_dev: self.redraw_extra(a_id) def xdev_changed(self, widget): # self.xdev_select.set_sensitive(False) if not self.xdev_select.get_sensitive(): return self.cap_chan_spin.set_sensitive(False) self.play_chan_spin.set_sensitive(False) self.hide_check.set_sensitive(False) self.xdev_rate_drop.set_sensitive(False) self.xdev_buff_drop.set_sensitive(False) self.xdev_nperiods_drop.set_sensitive(False) self.xdev_cap_lat.set_sensitive(False) self.xdev_play_lat.set_sensitive(False) self.xdev_name.set_sensitive(False) # TODO add name and desc to rest of xdev_changed self.xdev_description.set_sensitive(False) self.xdev_profile.set_sensitive(False) this_dev = str(self.xdev_select.get_active_id()) dev_db = self.conf_db['devices'][this_dev.split(',')[0]] sub_db = dev_db['sub'][str(this_dev.split(',')[1])] sub_db['cap-chan'] = self.cap_chan_spin.get_value_as_int() sub_db['play-chan'] = self.play_chan_spin.get_value_as_int() sub_db['hide'] = self.hide_check.get_active() sub_db['rate'] = int(self.xdev_rate_drop.get_active_id()) sub_db['frame'] = int(self.xdev_buff_drop.get_active_id()) sub_db['nperiods'] = int(self.xdev_nperiods_drop.get_active_id()) sub_db['name'] = self.xdev_name.get_text().split(' ')[0] # get_value_as_int self.hide_check.set_sensitive(True) if not sub_db['hide']: self.cap_chan_spin.set_sensitive(True) self.play_chan_spin.set_sensitive(True) self.xdev_rate_drop.set_sensitive(True) self.xdev_buff_drop.set_sensitive(True) self.xdev_nperiods_drop.set_sensitive(True) self.xdev_cap_lat.set_sensitive(True) self.xdev_play_lat.set_sensitive(True) self.xdev_name.set_sensitive(True) def xdev_cap_all_cb(self, widget): if not widget.get_sensitive(): return self.not_applied = True widget.set_sensitive(False) # self.cap_chan_spin.set_sensitive(False) # widget.set_active(False) if widget.get_label() == ' All ': self.cap_chan_spin.set_value(100) widget.set_label(' Off') else: widget.set_label(' All ') self.cap_chan_spin.set_value(0) widget.set_sensitive(True) self.cap_chan_spin.set_sensitive(True) def xdev_play_all_cb(self, widget): if not widget.get_sensitive(): return self.not_applied = True widget.set_sensitive(False) # self.play_chan_spin.set_sensitive(False) # widget.set_active(False) if widget.get_label() == ' All ': self.play_chan_spin.set_value(100) widget.set_label(' Off') else: widget.set_label(' All ') self.play_chan_spin.set_value(0) widget.set_sensitive(True) self.play_chan_spin.set_sensitive(True) def redraw_extra(self, next_device): ''' change all widgets to reflect current values of the selected device ''' frames = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096] self.xdev_select.set_sensitive(False) self.cap_chan_spin.set_sensitive(False) self.play_chan_spin.set_sensitive(False) self.hide_check.set_sensitive(False) self.xdev_rate_drop.set_sensitive(False) self.xdev_buff_drop.set_sensitive(False) self.xdev_nperiods_drop.set_sensitive(False) self.xdev_cap_lat.set_sensitive(False) self.xdev_play_lat.set_sensitive(False) self.xdev_name.set_sensitive(False) # TODO add name and desc to rest of redraw_extra self.xdev_description.set_sensitive(False) self.xdev_profile.set_sensitive(False) self.xdev_rate_drop.popdown() self.xdev_buff_drop.popdown() self.xdev_rate_drop.get_model().clear() self.xdev_buff_drop.get_model().clear() if next_device == 'none': for def_dev in self.conf_db['devices']: next_device = f"{def_dev},0,0" break self.xdev_select.set_active_id(next_device) dev_db = self.conf_db['devices'][next_device.split(',')[0]] sub_db = dev_db['sub'][str(next_device.split(',')[1])] if dev_db['number'] == -1: # device is unplugged, some info may be missing if 'rates' not in dev_db: # the device may not handle all these rates # but missing some would be bad dev_db['rates'] = ["32000", "44100", "48000", "88200", "96000", "192000"] self.cap_chan_spin.set_value(sub_db['cap-chan']) self.play_chan_spin.set_value(sub_db['play-chan']) self.hide_check.set_active(sub_db['hide']) for rate in dev_db['rates']: self.xdev_rate_drop.append(str(rate), str(rate)) self.xdev_rate_drop.set_active_id(str(sub_db['rate'])) for frame in frames: if dev_db['min_latency'] <= frame: self.xdev_buff_drop.append(str(frame), str(frame)) self.xdev_buff_drop.set_active_id(str(sub_db['frame'])) self.xdev_nperiods_drop.set_active_id(str(sub_db['nperiods'])) if sub_db['name'] == 'none': sub_db['name'] = next_device if not self.dirty: self.dirty = True self.xdev_name.set_text(sub_db['name']) self.xdev_cap_lat.set_value(sub_db['cap-latency']) self.xdev_play_lat.set_value(sub_db['cap-latency']) self.xdev_select.set_sensitive(True) self.hide_check.set_sensitive(True) if not sub_db['hide']: self.cap_chan_spin.set_sensitive(True) self.play_chan_spin.set_sensitive(True) self.xdev_rate_drop.set_sensitive(True) self.xdev_buff_drop.set_sensitive(True) self.xdev_nperiods_drop.set_sensitive(True) self.xdev_cap_lat.set_sensitive(True) self.xdev_play_lat.set_sensitive(True) self.xdev_name.set_sensitive(True) def hp_action_cb(self, widget): if not widget.get_sensitive(): return self.not_applied = True a_id = str(widget.get_active_id()) if a_id == "script": print("Script is called ~/.config/autojack/phones.sh") # this needs to be shown as a dialog def hp_switch_cb(self, button): self.signal_autojack("phones") def switchtomon_cb(self, button): self.signal_autojack("monitor") def jack_device_changed(self, button): if not button.get_sensitive(): return self.not_applied = True a_id = str(button.get_active_id()) a_desc = str(button.get_active_text()) if a_id != "None": self.conf_db['jack']['dev'] = a_id self.dev_desc = a_desc if not self.dirty: self.dirty = True def jack_driver_changed(self, button): if not button.get_sensitive(): return self.not_applied = True a_driver = str(button.get_active_text()) self.conf_db['jack']['driver'] = a_driver if not self.dirty: self.dirty = True def usb_master_changed(self, button): if not button.get_sensitive(): return self.not_applied = True a_id = str(button.get_active_id()) if a_id != "None": self.conf_db['jack']['usbdev'] = a_id if not self.dirty: self.dirty = True def usb_plug_cb(self, widget): if not widget.get_sensitive(): return self.not_applied = True a_id = widget.get_active() # Pulse bridge calls def refresh_pulse_io(self): ''' the ports that can be connected to pulse ports varies with what jack offers. This refreshes the two drop downs ''' global jack_client global jack_alive # need to know what the current direction is self.pj_con.set_sensitive(False) self.monitor_combo.set_sensitive(False) direction = self.pj_direction.get_active_id() our_db = self.conf_db['pulse']['outputs'] if direction == 'in': our_db = self.conf_db['pulse']['inputs'] br_exists = False if our_db != {}: br_exists = True if self.pj_bridge == "": for br in in_db: self.pj_bridge = br break self.pj_con.popdown() self.pj_con.get_model().clear() self.pj_con.append("none", "no connection") if direction == 'out': self.pj_con.append("monitor", "Main Output Ports") self.pj_con.set_active_id('none') if br_exists: our_con = our_db[self.pj_bridge]['connection'] if our_con != 'none': self.pj_con.append(our_con, our_con) self.pj_con.set_active_id(our_con) self.monitor_combo.popdown() self.monitor_combo.get_model().clear() monitor = self.conf_db['extra']["monitor"] self.monitor_combo.append(monitor, monitor) self.monitor_combo.set_active_id(monitor) if jack_alive: if direction == 'in': # get capture ports with audio and hardware jack_cap = jack_client.get_ports( "", is_audio=True, is_output=True, is_physical=True) extra = "" last_dev = "" for jport in jack_cap: port = jport.name dev = port.split(':', 1)[0] paliases = jport.aliases for palias in paliases: adev = palias.split(':', 1)[0] if adev == "system": extra = f" <{dev}>" dev = adev port = palias self.pj_con.append(port, f"{port} {extra}") if br_exists and port == our_db[ self.pj_bridge]['connection']: self.pj_con.remove(self.pj_con.get_active()) self.pj_con.set_active_id(port) jack_play = jack_client.get_ports( "", is_audio=True, is_input=True, is_physical=True) extra = "" last_dev = "" for jport in jack_play: port = jport.name dev = port.split(':', 1)[0] paliases = jport.aliases for palias in paliases: adev = palias.split(':', 1)[0] if adev == "system": extra = f" <{dev}>" dev = adev port = palias if direction == 'out': self.pj_con.append(port, f"{port} {extra}") if br_exists and port == our_db[ self.pj_bridge]['connection']: self.pj_con.remove(self.pj_con.get_active()) self.pj_con.set_active_id(port) self.monitor_combo.append(port, f"{port} {extra}") if port == monitor: self.monitor_combo.remove(self.monitor_combo.get_active()) self.monitor_combo.set_active_id(monitor) else: self.pj_con.append( "none", "Ports cannot be displayed unless JACK is running.") self.monitor_combo.append( "none", "Ports cannot be displayed unless JACK is running.") self.pj_con.set_sensitive(True) self.monitor_combo.set_sensitive(True) def refresh_pulse_tab(self, new_bridge): ''' Fill in all pulse related widgets ''' global jack_ports_changed out_db = self.conf_db['pulse']['outputs'] in_db = self.conf_db['pulse']['inputs'] self.pj_combo.set_sensitive(False) self.pj_direction.set_sensitive(False) self.pj_name.set_sensitive(False) self.pj_count.set_sensitive(False) self.pj_con.set_sensitive(False) self.pj_combo.popdown() self.pj_combo.get_model().clear() if out_db == {} and in_db == {}: # no bridges self.pj_combo.set_active(0) self.pj_name.set_text("") self.pj_count.set_value(1) self.pj_con.set_active_id('none') self.pj_direction.set_active_id('out') self.pj_bridge = '' else: for bridge in out_db: self.pj_combo.append(bridge, bridge) if not new_bridge: new_bridge = bridge for bridge in in_db: self.pj_combo.append(bridge, bridge) if not new_bridge: new_bridge = bridge our_db = {} if new_bridge in out_db: our_db = out_db self.pj_direction.set_active_id('out') if new_bridge in in_db: our_db = in_db self.pj_direction.set_active_id('in') self.pj_combo.set_active_id(new_bridge) self.pj_name.set_text(new_bridge) self.pj_count.set_value(our_db[new_bridge]['count']) self.pj_con.set_active_id(our_db[new_bridge]['connection']) self.pj_bridge = new_bridge self.pj_combo.set_sensitive(True) self.pj_name.set_sensitive(True) self.pj_count.set_sensitive(True) self.pj_con.set_sensitive(True) # rebuild the the connection dropdowns jack_ports_changed = True def pj_name_cb(self, widget): ''' call back for any pulse bridge input name or connect change to current values ''' if not widget.get_sensitive(): return self.not_applied = True if self.pj_direction.get_active_id() == 'in': temp_db = self.conf_db['pulse']['inputs'] if self.pj_direction.get_active_id() == 'out': temp_db = self.conf_db['pulse']['outputs'] if temp_db != {}: old_name = self.pj_combo.get_active_id() new_name = self.pj_name.get_text().split()[0] if new_name != old_name: if old_name in temp_db: temp_db[new_name] = temp_db.pop(old_name) else: temp_db[new_name][ 'connection'] = f"{self.pj_con.get_active_id()}" temp_db[new_name][ 'count'] = self.pj_count.get_value_as_int() self.refresh_pulse_tab(new_name) def pj_combo_cb(self, widget): ''' callback to look at different pa bridge. need to save name and connection, then refresh name and connections to match ''' if not widget.get_sensitive(): return if widget.get_active() < 0: return self.pj_bridge = self.pj_combo.get_active_id() self.refresh_pulse_tab(self.pj_bridge) def pj_add_cb(self, widget): ''' need to create a name for the bridge and assign connect as "none". Before switching to display the new bridge we need to save the current bridge info ''' if not widget.get_sensitive(): return direct = 'out' temp_db = {} if widget.get_active_id() == 'in': direct = 'in' temp_db = self.conf_db['pulse']['inputs'] elif widget.get_active_id() == 'out': direct = 'out' temp_db = self.conf_db['pulse']['outputs'] else: widget.set_active_id('label') return widget.set_sensitive(False) self.not_applied = True indx = 1 done = False new_name = "" while not done: new_name = f"pulse{str(indx)}-{direct}" if new_name not in temp_db: done = True else: indx = indx + 1 temp_db[new_name] = { 'connection': "none", 'count': 2 } self.refresh_pulse_tab(new_name) widget.set_active_id('label') widget.set_sensitive(True) def pj_rem_cb(self, widget): ''' get index of current bridge remove name from list by index remove connection from list by index ''' if not widget.get_sensitive(): return self.not_applied = True name = self.pj_combo.get_active_id() if name in self.conf_db['pulse']['inputs']: del self.conf_db['pulse']['inputs'][name] elif name in self.conf_db['pulse']['outputs']: del self.conf_db['pulse']['outputs'][name] self.pj_bridge = "" self.refresh_pulse_tab(self.pj_bridge) # network callbacks and refresh def refresh_net(self, new_bridge): ''' set up values for first bridge and midi ''' # midi self.mnet_count.set_sensitive(False) self.mnet_type.set_sensitive(False) qmn = shutil.which("qmidinet") if qmn is None: self.mnet_warn.set_text("Please Install qmidinet First") self.mnet_count.set_value(0) else: self.mnet_warn.set_text("") self.mnet_count.set_value(self.conf_db['mnet']['count']) self.mnet_type.set_active_id(self.conf_db['mnet']['type']) self.mnet_count.set_sensitive(True) self.mnet_type.set_sensitive(True) # zita-njbridge znet_db = self.conf_db['znet'] self.znet_bridge.set_sensitive(False) self.znet_direction.set_sensitive(False) self.znet_count.set_sensitive(False) self.znet_ip.set_sensitive(False) self.znet_port.set_sensitive(False) self.znet_bits.set_sensitive(False) self.znet_name.set_sensitive(False) self.znet_late.set_sensitive(False) znj = shutil.which("zita-n2j") if znj is None: self.znet_warn.set_text("Please Install zita-njbridge First") else: self.znet_warn.set_text("") self.znet_bridge.get_model().clear() our_ip = "0.0.0.0" if znet_db == {}: # no bridges self.znet_bridge.set_active(0) self.znet_direction.set_active_id('out') self.znet_count.set_value(1) self.znet_ip.set_text(our_ip) self.znet_port.set_value(8300) self.znet_bits.set_active_id('none') self.znet_late.set_value(0) self.znet_name.set_text("") self.znetbridge = "" return else: for bridge in znet_db: self.znet_bridge.append(bridge, bridge) if not new_bridge: new_bridge = bridge self.znet_direction.set_active_id( znet_db[new_bridge]['direction']) self.znet_bridge.set_active_id(new_bridge) self.znet_name.set_text(new_bridge) self.znet_count.set_value(znet_db[new_bridge]['count']) self.znet_ip.set_text(znet_db[new_bridge]['ip']) self.znet_port.set_value(znet_db[new_bridge]['port']) if znet_db[new_bridge]['direction'] == 'out': self.znet_bits.set_active_id(znet_db[new_bridge]['bits']) self.znet_late.set_value(0) else: self.znet_bits.set_active_id('none') self.znet_late.set_value(znet_db[new_bridge]['latency']) self.znetbridge = new_bridge if shutil.which("zita-n2j") is None: self.znet_warn.set_text("Please install zita-njbridge") else: self.znet_warn.set_text("") if znet_db[new_bridge]['direction'] == 'in': self.znet_late.set_sensitive(True) if znet_db[new_bridge]['direction'] == 'out': self.znet_bits.set_sensitive(True) self.znet_bridge.set_sensitive(True) self.znet_count.set_sensitive(True) self.znet_ip.set_sensitive(True) self.znet_port.set_sensitive(True) self.znet_name.set_sensitive(True) # audio def znet_bridge_cb(self, widget): ''' callback to look at different znet bridge. need to save name and connection, then refresh name and connections to match ''' if not widget.get_sensitive(): return if widget.get_active() < 0: return self.znetbridge = self.znet_bridge.get_active_id() self.refresh_net(self.znetbridge) def znet_changed_cb(self, widget): if not widget.get_sensitive(): return this_db = self.conf_db['znet'] if this_db != {}: old_name = self.znet_bridge.get_active_id() new_name = self.znet_name.get_text().split()[0] if new_name != old_name: if old_name in this_db: this_db[new_name] = this_db.pop(old_name) else: # probably a "" string return this_db[new_name]['count'] = self.znet_count.get_value_as_int() this_db[new_name]['ip'] = self.znet_ip.get_text() this_db[new_name]['port'] = self.znet_port.get_value_as_int() if this_db[new_name]['direction'] == 'out': if self.znet_bits.get_active_id() != 'none': this_db[new_name]['bits'] = self.znet_bits.get_active_id() else: this_db[new_name]['bits'] = 'none' if this_db[new_name]['direction'] == 'in': this_db[new_name][ 'latency'] = self.znet_late.get_value_as_int() else: this_db[new_name]['latency'] = 0 self.not_applied = True self.refresh_net(new_name) def znet_add_cb(self, widget): ''' need to create a name for the bridge and default values before switching to display the new bridge ''' if not widget.get_sensitive(): return direct = 'out' if not widget.get_active_id() in ['in', 'out']: widget.set_active_id('label') return widget.set_sensitive(False) self.not_applied = True indx = 1 done = False new_name = "" temp_db = self.conf_db['znet'] while not done: new_name = f"net{str(indx)}-{widget.get_active_id()}" if new_name not in temp_db: done = True else: indx = indx + 1 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() late = 10 bits = "none" if widget.get_active_id() == 'out': ipl = ip.split('.') ip = f"{ipl[0]}.{ipl[1]}.{ipl[2]}." late = 0 bits = "16bit" dialog = Gtk.MessageDialog( transient_for=self.window_main, flags=0, message_type=Gtk.MessageType.WARNING, buttons=Gtk.ButtonsType.OK, text="Warning, The IP is not complete", ) response = dialog.run() dialog.destroy() temp_db[new_name] = { 'direction': widget.get_active_id(), 'ip': ip, 'port': 8300, 'bits': bits, 'latency': late, 'count': 2 } self.refresh_net(new_name) widget.set_active_id('label') widget.set_sensitive(True) def znet_rem_cb(self, widget): ''' Remove the currently displayed bridge and refresh display (which defaults to the first in the list)''' if not widget.get_sensitive(): return self.not_applied = True name = self.znet_bridge.get_active_id() if name in self.conf_db['znet']: del self.conf_db['znet'][name] self.znetbridge = "" self.refresh_net(self.znetbridge) # midi def mnet_count_cb(self, widget): if not widget.get_sensitive(): return self.conf_db['mnet']['count'] = widget.get_value_as_int() self.not_applied = True def mnet_type_cb(self, widget): if not widget.get_sensitive(): return self.conf_db['mnet']['type'] = widget.get_active_id() self.not_applied = True # External applications calls def mixer_cb(self, widget): '''callback for mixer button. This starts QASMixer with the device set to whatever is jack master''' if not widget.get_sensitive(): return widget.set_sensitive(False) widget.popdown() if widget.get_active == -1: widget.set_active_id("none") return if widget.get_active_id() == "none": return if widget.get_active_id() == "ffado": ffm = shutil.which("ffado-mixer") if ffm is None: print("ffado-mixer not installed") else: try: subprocess.Popen([ffm], shell=False).pid except Exception: print("ffado-mixer errors are normal :P") else: mixdevice = widget.get_active_id() qasm = shutil.which("qasmixer") if qasm is None: print("qasmixer not installed") else: subprocess.Popen( [qasm, "-n", f"--device=hw:{str(mixdevice)}"], shell=False).pid widget.set_active_id("none") widget.set_sensitive(True) def pavucontrol_cb(self, button): '''callback for pulse control button, opens pavucontrol''' pvc = shutil.which("pavucontrol") if pvc is None: pvc = shutil.which("pavucontrol-qt") if pvc is None: print("pavucontrol not installed") else: subprocess.Popen([pvc], shell=False).pid def carla_cb(self, button): '''callback for carla button, opens carla''' crla = shutil.which("carla") if crla is None: button.set_label("Please Install Carla First") else: subprocess.Popen([crla], shell=False).pid def ray_cb(self, button): '''callback for raysession button, opens raysession''' rays = shutil.which("raysession") if rays is None: button.set_label("Please Install RaySession First") else: subprocess.Popen([rays], shell=False).pid def nsm_cb(self, button): '''callback for nsm button, opens New Session Manager''' ns = shutil.which("nsm-legacy-gui") if ns is None: button.set_label("Please Install New Session Manager First") else: subprocess.Popen([ns], shell=False).pid def agordejo_cb(self, button): '''callback for agordejo button, opens agordejo''' agdo = shutil.which("agordejo") if agdo is None: button.set_label("Please Install Agordejo First") else: subprocess.Popen([agdo], shell=False).pid # Autojack signalling calls def cb_jack_start(self, button): ''' call back for Jack (re)start button''' self.jackdb['on'] = True self.config_save() self.not_applied = False self.signal_autojack("start") def cb_jack_stop(self, button): self.jackdb['on'] = False self.config_save() self.not_applied = False self.signal_autojack("config") def cb_audio_apply(self, button): '''callback for audio tab apply button''' self.config_save() self.not_applied = False self.signal_autojack("config") def config_save(self): ''' Write audio setting to ~/.config/autojack/autojack.json''' self.jackdb['chan-in'] = int(self.chan_in_spin.get_value_as_int()) self.jackdb['chan-out'] = int(self.chan_out_spin.get_value_as_int()) self.jackdb['rate'] = int(self.jack_rate_combo.get_active_id()) self.jackdb['frame'] = int(self.combobox_late.get_active_id()) self.jackdb['period'] = int(self.combo_periods.get_active_id()) self.jackdb['connect-mode'] = str(self.jk_connect_mode.get_active_id()) self.jackdb['cap-latency'] = int(self.cap_lat_spin.get_value_as_int()) self.jackdb['play-latency'] = int( self.play_lat_spin.get_value_as_int()) self.extra['a2j'] = self.jack_midi_check.get_active() self.extra['a2j_u'] = self.jack_midi_u.get_active() self.extra['usbauto'] = self.usb_plug_check.get_active() self.extra['monitor'] = str(self.monitor_combo.get_active_id()) self.extra['phone-action'] = str(self.hp_action.get_active_id()) self.extra['phone-device'] = str(self.hp_device.get_active_id()) self.extra['phone-left'] = str(self.hp_left_pt.get_value_as_int()) auto_jack.our_db = self.conf_db auto_jack.write_new() time.sleep(1) return def signal_autojack(self, signal): global autojack if autojack: self.sendbs.signal(signal) else: time.sleep(5) if autojack: self.sendbs.signal(signal) else: print("Starting Autojack...") # first tell any old autojack to die self.sendbs.signal('quit') # do it subprocess.Popen( [f"{install_path}/bin/autojack"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, shell=False).pid def on_button_help_ok_clicked(self, button): self.window_help.hide() def on_button_rt_info_ok_clicked(self, button): self.message_dialog_rt_info.hide() def warnshow(self, msgtxt): dialog = Gtk.MessageDialog( transient_for=self.window_main, flags=0, message_type=Gtk.MessageType.WARNING, buttons=Gtk.ButtonsType.OK, text="WARNING", ) dialog.format_secondary_text( msgtxt ) response = dialog.run() dialog.destroy() # All the ways we can die def on_button_msg_ok_clicked(self, button): self.we_die() def on_main_button_cancel_clicked(self, button): self.we_die() def on_window_main_delete_event(self, *args): self.we_die() def sig_handler(self, signum, frame): ''' a handler for system signals that may be sent by the system. we want to trap sigint, sigkill and sigterm and do the same as above. ''' self.we_die() def we_die(self): global jack_alive global jack_client global lock_file if jack_alive: jack_client.close() # self.window_main = builder.get_object('window_main') if self.not_applied: dialog = Gtk.MessageDialog( transient_for=self.window_main, flags=0, message_type=Gtk.MessageType.WARNING, buttons=Gtk.ButtonsType.NONE, text="Warning, New Settings have not been Applied", ) dialog.add_buttons("Apply Settings", 10, "Exit", 11) response = dialog.run() if response == 10: self.config_save() self.not_applied = False self.signal_autojack("config") dialog.destroy() if os.path.isfile(lock_file): new_pid = str(os.getpid()) if os.path.isfile(lock_file): with open(lock_file, "r") as lk_file: for line in lk_file: # only need one line old_pid = line.rstrip() if new_pid == old_pid: os.remove(lock_file) Gtk.main_quit() us = StudioControls() us.window_main.show_all() Gtk.main()