added my Recipes
This commit is contained in:
@@ -0,0 +1,17 @@
|
||||
Application:
|
||||
Name: Bluetooth
|
||||
Description: speaker
|
||||
Icon: application/bluetooth/pictures/ST11012_bluetooth_speaker_light_green.png
|
||||
Type: python
|
||||
Board:
|
||||
List: all
|
||||
Python:
|
||||
Exist:
|
||||
Command: /usr/local/demo/application/bluetooth/bin/check_ble.sh
|
||||
Msg_false: Please connect a bluetooth controller on the board
|
||||
Module: application.bluetooth.bluetooth_audio
|
||||
Action:
|
||||
button_release_event: python_start
|
||||
button_press_event: highlight_eventBox
|
||||
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 7.5 KiB |
@@ -0,0 +1 @@
|
||||
#print('Importing bluetooth __init__')
|
||||
@@ -0,0 +1,596 @@
|
||||
#!/usr/bin/python3
|
||||
# Copyright (c) 2019 STMicroelectronics. All rights reserved.
|
||||
#
|
||||
# This software component is licensed by ST under BSD 3-Clause license,
|
||||
# the "License"; You may not use this file except in compliance with the
|
||||
# License. You may obtain a copy of the License at:
|
||||
# opensource.org/licenses/BSD-3-Clause
|
||||
|
||||
|
||||
import gi
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk
|
||||
from gi.repository import Gdk
|
||||
from gi.repository import GLib
|
||||
|
||||
import re
|
||||
import os
|
||||
import subprocess
|
||||
import pexpect
|
||||
from time import sleep, time
|
||||
|
||||
try:
|
||||
from application.bluetooth.wrap_blctl import wrapper_blctl as Bluetoothctl
|
||||
except ModuleNotFoundError:
|
||||
from wrap_blctl import wrapper_blctl as Bluetoothctl
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# -------------------------------------------------------------------
|
||||
SUBMODULE_PATH = "application/bluetooth"
|
||||
DEMO_PATH = "/usr/local/demo"
|
||||
# -------------------------------------------------------------------
|
||||
# -------------------------------------------------------------------
|
||||
ICON_SIZE_1080 = 260
|
||||
ICON_SIZE_720 = 180
|
||||
ICON_SIZE_480 = 128
|
||||
ICON_SIZE_272 = 48
|
||||
|
||||
TREELIST_HEIGHT_1080 = 500
|
||||
TREELIST_HEIGHT_720 = 400
|
||||
TREELIST_HEIGHT_480 = 160
|
||||
TREELIST_HEIGHT_272 = 68
|
||||
|
||||
# return format:
|
||||
# [ icon_size, font_size, treelist_height, button_height ]
|
||||
SIZES_ID_ICON_SIZE = 0
|
||||
SIZES_ID_FONT_SIZE = 1
|
||||
SIZES_ID_TREELIST_HEIGHT = 2
|
||||
SIZES_ID_BUTTON_HEIGHT = 3
|
||||
def get_sizes_from_screen_size(width, height):
|
||||
minsize = min(width, height)
|
||||
icon_size = None
|
||||
font_size = None
|
||||
treelist_height = None
|
||||
button_height = None
|
||||
if minsize == 720:
|
||||
icon_size = ICON_SIZE_720
|
||||
font_size = 25
|
||||
treelist_height = TREELIST_HEIGHT_720
|
||||
button_height = 60
|
||||
elif minsize == 480:
|
||||
icon_size = ICON_SIZE_480
|
||||
font_size = 20
|
||||
treelist_height = TREELIST_HEIGHT_480
|
||||
button_height = 60
|
||||
elif minsize == 272:
|
||||
icon_size = ICON_SIZE_272
|
||||
font_size = 15
|
||||
treelist_height = TREELIST_HEIGHT_272
|
||||
button_height = 25
|
||||
elif minsize == 600:
|
||||
icon_size = ICON_SIZE_720
|
||||
font_size = 15
|
||||
treelist_height = TREELIST_HEIGHT_720
|
||||
button_height = 60
|
||||
elif minsize >= 1080:
|
||||
icon_size = ICON_SIZE_1080
|
||||
font_size = 32
|
||||
treelist_height = TREELIST_HEIGHT_1080
|
||||
button_height = 80
|
||||
return [icon_size, font_size, treelist_height, button_height]
|
||||
|
||||
def get_treelist_height_from_screen_size(width, height):
|
||||
minsize = min(width, height)
|
||||
if minsize == 720:
|
||||
return TREELIST_HEIGHT_720
|
||||
elif minsize == 480:
|
||||
return TREELIST_HEIGHT_480
|
||||
elif minsize == 272:
|
||||
return TREELIST_HEIGHT_272
|
||||
elif minsize == 600:
|
||||
return ICON_SIZE_1080
|
||||
elif minsize >= 1080:
|
||||
return ICON_SIZE_1080
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
SCAN_DURATION_IN_S = 15
|
||||
|
||||
regexps_audio = [
|
||||
re.compile(r"00001108-(?P<Headset>.+)$"),
|
||||
re.compile(r"0000110b-(?P<AudioSink>.+)$"),
|
||||
]
|
||||
|
||||
re_connected = re.compile(r"Connected:(?P<Connected>.+)$")
|
||||
|
||||
re_paired = re.compile(r"Paired:(?P<Paired>.+)$")
|
||||
|
||||
Item_info_dev = ['Headset', 'AudioSink', 'Connected', 'Paired']
|
||||
|
||||
regexps_devinfo = [
|
||||
re.compile(r"00001108-(?P<Headset>.+)$"),
|
||||
re.compile(r"0000110b-(?P<AudioSink>.+)$"),
|
||||
re.compile(r"Connected:(?P<Connected>.+)$"),
|
||||
re.compile(r"Paired:(?P<Paired>.+)$"),
|
||||
]
|
||||
########################################
|
||||
#pactl (pulseaudio controller) wrapper
|
||||
########################################
|
||||
#for parse_sinks
|
||||
re_sink = re.compile(r"^Sink #(?P<Ident>.+)$")
|
||||
re_prop_sink = [
|
||||
re.compile(r"State:(?P<State>.+)$"),
|
||||
re.compile(r"Description:\s+(?P<Name>.+)$")
|
||||
]
|
||||
|
||||
#for parse_streams
|
||||
re_stream = re.compile(r"^Sink Input #(?P<Ident>.+)$")
|
||||
re_prop_stream = [
|
||||
re.compile(r"Sink:\s+(?P<Sink>.+)$"),
|
||||
re.compile(r"media\.name\s=\s(?P<Name>.+)$")
|
||||
]
|
||||
# id_str : ident of the stream, id_sink : ident of the sink
|
||||
def audiosink_set(id_str, id_sink):
|
||||
print("audiosink_set ")
|
||||
#print("id_str : %d", id_str)
|
||||
#print("id_sink : %d", id_sink)
|
||||
cmd = ["/usr/bin/pactl", "move-sink-input", id_str, id_sink]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
res = proc.stdout.read().decode('utf-8')
|
||||
return res
|
||||
|
||||
def scan_streams():
|
||||
cmd = ["/usr/bin/pactl", "list", "sink-inputs"]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
res = proc.stdout.read().decode('utf-8')
|
||||
return res
|
||||
|
||||
def parse_streams(streams):
|
||||
streams_lines = streams.split('\n')
|
||||
l_streams = []
|
||||
for line in streams_lines:
|
||||
line = line.strip()
|
||||
elt = re_stream.search(line)
|
||||
if elt is not None:
|
||||
l_streams.append(elt.groupdict())
|
||||
continue
|
||||
for reg in re_prop_stream:
|
||||
res = reg.search(line)
|
||||
if res is not None:
|
||||
l_streams[-1].update(res.groupdict())
|
||||
return l_streams
|
||||
|
||||
|
||||
def scan_sinks():
|
||||
cmd = ["/usr/bin/pactl", "list", "sinks"]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
res = proc.stdout.read().decode('utf-8')
|
||||
return res
|
||||
|
||||
def parse_sinks(sinks):
|
||||
sinks_lines=sinks.split('\n')
|
||||
l_sinks =[]
|
||||
for line in sinks_lines:
|
||||
line = line.strip()
|
||||
elt = re_sink.search(line)
|
||||
if elt is not None:
|
||||
l_sinks.append(elt.groupdict())
|
||||
continue
|
||||
for reg in re_prop_sink:
|
||||
res = reg.search(line)
|
||||
if res is not None:
|
||||
l_sinks[-1].update(res.groupdict())
|
||||
return l_sinks
|
||||
|
||||
|
||||
|
||||
def status_playback(self):
|
||||
sink_ident = []
|
||||
stream_ident = None
|
||||
|
||||
list_sinks = scan_sinks()
|
||||
sinks = parse_sinks(list_sinks)
|
||||
#print("refresh label_audio\n")
|
||||
#print(sinks)
|
||||
mess_bt = ""
|
||||
if sinks != []:
|
||||
for sk in sinks:
|
||||
for bt_dev_conn in self.list_dev_connect:
|
||||
if sk['Name'] == bt_dev_conn['name']:
|
||||
if mess_bt != "":
|
||||
mess_bt = mess_bt + "\n"
|
||||
mess_bt = mess_bt + "The audio BT device " + sk['Name'] + " is connected"
|
||||
sink_ident.append({'name': sk['Name'], 'ident': sk['Ident']})
|
||||
|
||||
if mess_bt == "":
|
||||
mess_bt = "Device not connected"
|
||||
|
||||
self.label_audio.set_markup("<span font='20' color='#000000'>%s</span>" % mess_bt)
|
||||
self.label_audio.set_justify(Gtk.Justification.LEFT)
|
||||
self.label_audio.set_line_wrap(True)
|
||||
return [stream_ident, sink_ident]
|
||||
|
||||
|
||||
def get_device_info(bl, macadr):
|
||||
#print("get_device_info")
|
||||
info_dev = bl.blctl_info(macadr)
|
||||
dict_info = {}
|
||||
for elt in Item_info_dev:
|
||||
dict_info[elt] = ''
|
||||
for reelt in regexps_devinfo:
|
||||
for elt in info_dev:
|
||||
result = reelt.search(elt)
|
||||
if result is not None:
|
||||
dict_info.update(result.groupdict())
|
||||
break
|
||||
return(dict_info)
|
||||
|
||||
|
||||
def list_devices(self, paired = False):
|
||||
#print("list_devices")
|
||||
if self.locked_devices == False:
|
||||
self.locked_devices = True
|
||||
self.bluetooth_liststore.clear()
|
||||
self.current_devs=[]
|
||||
i=0
|
||||
if paired == True:
|
||||
devs = self.bl.blctl_paired_devices()
|
||||
else:
|
||||
devs = self.bl.blctl_devices()
|
||||
for elt in devs:
|
||||
elt_info = get_device_info(self.bl, elt['mac_address'])
|
||||
#print("name===" , elt['name'].encode('utf-8').strip())
|
||||
if elt['name'] == "RSSI is nil":
|
||||
continue
|
||||
if elt['name'] == "TxPower is nil":
|
||||
continue
|
||||
#do not list device without real name
|
||||
if elt['mac_address'].replace(':','') != elt['name'].replace('-',''):
|
||||
i=i+1
|
||||
self.current_devs.append(elt['mac_address'])
|
||||
#print(elt_info)
|
||||
l_elt = []
|
||||
l_elt.append(i)
|
||||
l_elt.append(elt['name'])
|
||||
l_elt.append(elt_info['Connected'])
|
||||
if elt_info['Headset'] != '' or elt_info['AudioSink'] != '':
|
||||
l_elt.append('yes')
|
||||
else:
|
||||
l_elt.append('no')
|
||||
|
||||
if elt_info['Connected'] == " yes":
|
||||
if elt not in self.list_dev_connect:
|
||||
self.list_dev_connect.insert(0,elt)
|
||||
self.bl.set_prompt(elt['name'])
|
||||
self.bluetooth_liststore.append(l_elt)
|
||||
|
||||
self.locked_devices = False
|
||||
|
||||
|
||||
def device_connected(bl, macadr):
|
||||
info_dev=bl.blctl_info(macadr)
|
||||
if info_dev is not None:
|
||||
for elt in info_dev:
|
||||
result = re_connected.search(elt)
|
||||
if result is not None:
|
||||
l_info_dev = result.groupdict()
|
||||
if l_info_dev["Connected"] == " yes":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def device_paired(bl, macadr):
|
||||
info_dev=bl.blctl_info(macadr)
|
||||
if info_dev is not None:
|
||||
for elt in info_dev:
|
||||
result = re_paired.search(elt)
|
||||
if result is not None:
|
||||
l_info_dev = result.groupdict()
|
||||
if l_info_dev["Paired"] == " yes":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def device_audio(bl, macadr):
|
||||
info_dev=bl.blctl_info(macadr)
|
||||
for reelt in regexps_audio:
|
||||
for elt in info_dev:
|
||||
result = reelt.search(elt)
|
||||
if result is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# -------------------------------------------------------------------
|
||||
def gtk_style():
|
||||
css = b"""
|
||||
|
||||
.widget .grid .label {
|
||||
background-color: rgba (31%, 32%, 32%, 0.9);
|
||||
}
|
||||
.textview {
|
||||
color: gray;
|
||||
}
|
||||
.label {
|
||||
color: black;
|
||||
}
|
||||
.switch {
|
||||
min-height: 44px;
|
||||
}
|
||||
"""
|
||||
style_provider = Gtk.CssProvider()
|
||||
style_provider.load_from_data(css)
|
||||
|
||||
Gtk.StyleContext.add_provider_for_screen(
|
||||
Gdk.Screen.get_default(),
|
||||
style_provider,
|
||||
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
|
||||
|
||||
class BluetoothWindow(Gtk.Dialog):
|
||||
def __init__(self, parent):
|
||||
Gtk.Dialog.__init__(self, "Wifi", parent, 0)
|
||||
self.maximize()
|
||||
self.set_decorated(False)
|
||||
|
||||
gtk_style()
|
||||
try:
|
||||
display = Gdk.Display.get_default()
|
||||
monitor = display.get_primary_monitor()
|
||||
geometry = monitor.get_geometry()
|
||||
scale_factor = monitor.get_scale_factor()
|
||||
self.screen_width = scale_factor * geometry.width
|
||||
self.screen_height = scale_factor * geometry.height
|
||||
except:
|
||||
self.screen_width = self.get_screen().get_width()
|
||||
self.screen_height = self.get_screen().get_height()
|
||||
self.treelist_height = get_treelist_height_from_screen_size(self.screen_width, self.screen_height)
|
||||
sizes = get_sizes_from_screen_size(self.screen_width, self.screen_height)
|
||||
self.font_size = sizes[SIZES_ID_FONT_SIZE]
|
||||
self.button_height = sizes[SIZES_ID_BUTTON_HEIGHT]
|
||||
|
||||
self.connect("button-release-event", self.on_page_press_event)
|
||||
mainvbox = self.get_content_area()
|
||||
|
||||
self.dev_selected = {'mac_address':'', 'name':''}
|
||||
self.audio_bt_sink = []
|
||||
self.list_dev_connect = []
|
||||
self.current_devs = []
|
||||
self.locked_devices = False
|
||||
self.scan_done = False
|
||||
self.previous_click_time=0
|
||||
|
||||
self.page_bluetooth = Gtk.VBox()
|
||||
self.page_bluetooth.set_border_width(15)
|
||||
|
||||
self.title = Gtk.Label()
|
||||
self.title.set_markup("<span font='%d' color='#00000000'>Connect bluetooth headset</span>" % (self.font_size+5))
|
||||
self.page_bluetooth.add(self.title)
|
||||
|
||||
self.ButtonBox = Gtk.HBox(homogeneous=True)
|
||||
|
||||
self.lb_button_scan = Gtk.Label()
|
||||
self.lb_button_scan.set_markup("<span font='%d'>start scan</span>" % self.font_size)
|
||||
self.button_scan = Gtk.Button()
|
||||
self.button_scan.set_property("height-request", self.button_height)
|
||||
self.button_scan.add(self.lb_button_scan)
|
||||
self.button_scan.connect("clicked", self.on_selection_scan_clicked)
|
||||
self.ButtonBox.add(self.button_scan)
|
||||
|
||||
self.lb_button_connect = Gtk.Label()
|
||||
self.lb_button_connect.set_markup("<span font='%d' color='#88888888'>connect</span>" % self.font_size)
|
||||
self.button_connect = Gtk.Button()
|
||||
self.button_connect.add(self.lb_button_connect)
|
||||
self.button_connect.connect("clicked", self.on_selection_connect_clicked)
|
||||
self.ButtonBox.add(self.button_connect)
|
||||
|
||||
self.page_bluetooth.add(self.ButtonBox)
|
||||
|
||||
self.progress_vbox = Gtk.VBox()
|
||||
self.scan_progress = Gtk.ProgressBar()
|
||||
self.scan_progress.set_fraction(0.0)
|
||||
self.progress_vbox.pack_start(self.scan_progress, False, False, 3)
|
||||
self.page_bluetooth.add(self.progress_vbox)
|
||||
|
||||
self.tree_list_vbox = Gtk.VBox(homogeneous=True)
|
||||
|
||||
self.bluetooth_liststore = Gtk.ListStore(int, str, str, str)
|
||||
self.bluetooth_treeview = Gtk.TreeView(self.bluetooth_liststore)
|
||||
|
||||
l_col = ["n°", "name", "connected", "Audio"]
|
||||
for i, column_title in enumerate(l_col):
|
||||
renderer = Gtk.CellRendererText()
|
||||
renderer.set_property('font', "%d" % self.font_size)
|
||||
column = Gtk.TreeViewColumn(column_title, renderer, text=i)
|
||||
self.bluetooth_treeview.append_column(column)
|
||||
self.bluetooth_treeview.get_selection().connect("changed", self.on_changed)
|
||||
|
||||
self.scroll_treelist = Gtk.ScrolledWindow()
|
||||
self.scroll_treelist.set_vexpand(False)
|
||||
self.scroll_treelist.set_hexpand(False)
|
||||
self.scroll_treelist.set_property("min-content-height", self.treelist_height)
|
||||
self.scroll_treelist.add(self.bluetooth_treeview)
|
||||
self.tree_list_vbox.pack_start(self.scroll_treelist, True, True, 3)
|
||||
|
||||
self.page_bluetooth.add(self.tree_list_vbox)
|
||||
|
||||
self.label_audio = Gtk.Label()
|
||||
self.label_audio.set_markup("<span font='%d' color='#FFFFFFFF'> </span>" % self.font_size)
|
||||
self.label_audio.set_justify(Gtk.Justification.LEFT)
|
||||
self.label_audio.set_line_wrap(True)
|
||||
self.page_bluetooth.add(self.label_audio)
|
||||
|
||||
mainvbox.pack_start(self.page_bluetooth, False, True, 3)
|
||||
self.show_all()
|
||||
|
||||
# enable bluetooth
|
||||
os.system('su -c \"hciconfig hci0 up\"')
|
||||
#self.bluetooth_state = os.system('hciconfig hci0 up')
|
||||
self.bl = Bluetoothctl()
|
||||
|
||||
list_devices(self, paired=True)
|
||||
self.audio_bt_sink = status_playback(self)
|
||||
|
||||
|
||||
def display_message(self, message):
|
||||
dialog = Gtk.Dialog("Error", self, 0, (Gtk.STOCK_OK, Gtk.ResponseType.OK))
|
||||
dialog.set_decorated(False)
|
||||
width, height = self.get_size()
|
||||
dialog.set_default_size(width, height)
|
||||
rgba = Gdk.RGBA(0.31, 0.32, 0.31, 0.8)
|
||||
dialog.override_background_color(0,rgba)
|
||||
|
||||
label0 = Gtk.Label() #for padding
|
||||
|
||||
label1 = Gtk.Label()
|
||||
label1.set_markup(message)
|
||||
label1.set_justify(Gtk.Justification.CENTER)
|
||||
label1.set_line_wrap(True)
|
||||
|
||||
label2 = Gtk.Label() #for padding
|
||||
|
||||
# Create a centering alignment object
|
||||
align = Gtk.Alignment()
|
||||
align.set(0.5, 0, 0, 0)
|
||||
|
||||
dialog.vbox.pack_start(label0, True, False, 0)
|
||||
dialog.vbox.pack_start(label1, True, True, 0)
|
||||
dialog.vbox.pack_start(align, True, True, 0)
|
||||
dialog.vbox.pack_start(label2, True, False, 0)
|
||||
|
||||
dialog.action_area.reparent(align)
|
||||
dialog.show_all()
|
||||
|
||||
dialog.run()
|
||||
print("INFO dialog closed")
|
||||
|
||||
dialog.destroy()
|
||||
|
||||
|
||||
def on_page_press_event(self, widget, event):
|
||||
self.click_time = time()
|
||||
#print(self.click_time - self.previous_click_time)
|
||||
# TODO : a fake click is observed, workaround hereafter
|
||||
if (self.click_time - self.previous_click_time) < 0.01:
|
||||
self.previous_click_time = self.click_time
|
||||
elif (self.click_time - self.previous_click_time) < 0.3:
|
||||
print ("BluetoothWindow double click : exit")
|
||||
self.bl.close()
|
||||
self.destroy()
|
||||
else:
|
||||
#print ("simple click")
|
||||
self.previous_click_time = self.click_time
|
||||
|
||||
|
||||
def delayed_status_playback(self, user_data):
|
||||
self.audio_bt_sink = status_playback(self)
|
||||
return False
|
||||
|
||||
def progress_timeout(self, user_data):
|
||||
new_val=self.scan_progress.get_fraction() + 0.01
|
||||
if new_val > 1:
|
||||
self.scan_progress.set_fraction(0.0)
|
||||
self.bl.blctl_scan_off()
|
||||
self.lb_button_scan.set_markup("<span font='%d'>start scan</span>" % self.font_size)
|
||||
self.scan_done = True
|
||||
self.update_display()
|
||||
return False
|
||||
|
||||
self.scan_progress.set_fraction(new_val)
|
||||
self.scan_progress.set_text(str(new_val*100) + " % completed")
|
||||
return True
|
||||
|
||||
def on_changed(self, selection):
|
||||
(model, iter) = selection.get_selected()
|
||||
#print("on_changed")
|
||||
if iter is not None:
|
||||
self.audio_bt_sink = status_playback(self)
|
||||
#print(self.audio_bt_sink)
|
||||
self.dev_selected.update({'mac_address':self.current_devs[model[iter][0]-1], 'name':model[iter][1]})
|
||||
if model[iter][2] == " yes":
|
||||
self.lb_button_connect.set_markup("<span font='%d'>disconnect</span>" % self.font_size)
|
||||
else:
|
||||
if self.label_audio.get_text() == "Device not connected":
|
||||
self.lb_button_connect.set_markup("<span font='%d'>connect</span>" % self.font_size)
|
||||
else:
|
||||
self.lb_button_connect.set_markup("<span font='%d' color='#88888888'>connect</span>" % self.font_size)
|
||||
return True
|
||||
|
||||
def connect_process(self, dev):
|
||||
if device_connected(self.bl, dev['mac_address']):
|
||||
self.lb_button_connect.set_markup("<span font='%d'>disconnect</span>" % self.font_size)
|
||||
self.update_display()
|
||||
else:
|
||||
connect_res=self.bl.blctl_connect(dev['mac_address'])
|
||||
if connect_res == True:
|
||||
self.lb_button_connect.set_markup("<span font='%d' color='#88888888'>disconnect</span>" % self.font_size)
|
||||
self.update_display()
|
||||
# refresh status_playback after 2,5s because pulseaudio takes some time to update its status
|
||||
timer_update_dev = GLib.timeout_add(2500, self.delayed_status_playback, None)
|
||||
#In some cases, 2.5s is still not enough
|
||||
timer_update_dev = GLib.timeout_add(4000, self.delayed_status_playback, None)
|
||||
|
||||
def on_selection_connect_clicked(self, widget):
|
||||
if self.dev_selected['mac_address'] != '':
|
||||
device = self.dev_selected
|
||||
if self.lb_button_connect.get_text() == "connect":
|
||||
if self.label_audio.get_text() == "Device not connected":
|
||||
self.bl.set_prompt(device['name'])
|
||||
if device_paired(self.bl, device['mac_address']) == False:
|
||||
pairing_res=self.bl.blctl_pair(device['mac_address'])
|
||||
if pairing_res == 0:
|
||||
self.bl.blctl_session.send("no\n")
|
||||
else:
|
||||
if pairing_res == 1:
|
||||
sleep(5)
|
||||
self.connect_process(device)
|
||||
else:
|
||||
self.connect_process(device)
|
||||
else:
|
||||
print("[WARNING] A BT device is already connected :\ndisconnect it before connecting a new device\n")
|
||||
self.display_message("<span font='15' color='#000000'>A BT device is already connected :\nPlease disconnect it before connecting a new device\n</span>")
|
||||
else:
|
||||
connect_res=self.bl.blctl_disconnect(device['mac_address'])
|
||||
self.lb_button_connect.set_markup("<span font='%d' color='#88888888'>connect</span>" % self.font_size)
|
||||
self.update_display()
|
||||
else:
|
||||
print("[WARNING] Select the BT device to connect\n")
|
||||
self.display_message("<span font='15' color='#000000'>Please select a device in the list\n</span>")
|
||||
|
||||
def on_selection_scan_clicked(self, widget):
|
||||
if self.lb_button_scan.get_text() == "start scan":
|
||||
self.bl.blctl_scan_on()
|
||||
timer_scan = GLib.timeout_add(SCAN_DURATION_IN_S * 10, self.progress_timeout, None)
|
||||
self.lb_button_scan.set_markup("<span font='%d'>scan progress</span>"% self.font_size)
|
||||
|
||||
def update_display(self):
|
||||
if (self.scan_done == True):
|
||||
list_devices(self, False)
|
||||
else:
|
||||
list_devices(self, True)
|
||||
self.dev_selected.update({'mac_address':'', 'name':''})
|
||||
self.audio_bt_sink = status_playback(self)
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# -------------------------------------------------------------------
|
||||
def create_subdialogwindow(parent):
|
||||
_window = BluetoothWindow(parent)
|
||||
_window.show_all()
|
||||
response = _window.run()
|
||||
_window.destroy()
|
||||
|
||||
|
||||
# -------------------------------------------------
|
||||
# -------------------------------------------------
|
||||
# test submodule
|
||||
class TestUIWindow(Gtk.Window):
|
||||
def __init__(self):
|
||||
Gtk.Window.__init__(self, title="Test Launcher")
|
||||
create_subdialogwindow(self)
|
||||
self.show_all()
|
||||
|
||||
if __name__ == "__main__":
|
||||
win = TestUIWindow()
|
||||
win.connect("delete-event", Gtk.main_quit)
|
||||
win.show_all()
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
#!/bin/sh
|
||||
script -qc "su -c 'hciconfig hci0 up || echo ko > /tmp/ble'"
|
||||
|
||||
if [ -e /tmp/ble ]; then
|
||||
rm -f /tmp/ble
|
||||
exit 1;
|
||||
else
|
||||
exit 0;
|
||||
fi
|
||||
@@ -0,0 +1,196 @@
|
||||
import time
|
||||
import pexpect
|
||||
import sys
|
||||
import re
|
||||
import pickle
|
||||
|
||||
##############################
|
||||
# bluetoothctl tool wrapper
|
||||
##############################
|
||||
|
||||
__all__ = ["wrapper_blctl"]
|
||||
|
||||
|
||||
device = re.compile(r"Device\s(?P<mac_address>([0-9 A-F][0-9 A-F]:){5}[0-9 A-F][0-9 A-F])(?P<name>.+)$")
|
||||
|
||||
re_device_notvalid = [
|
||||
re.compile(r"CHG"),
|
||||
re.compile(r"NEW"),
|
||||
re.compile(r"DEL")
|
||||
]
|
||||
|
||||
def read_prompt():
|
||||
try:
|
||||
f = open('/tmp/list_prompt', 'rb')
|
||||
except IOError as e:
|
||||
print("Cant not open the file : /tmp/list_prompt\n")
|
||||
return None
|
||||
else:
|
||||
s = pickle.load(f)
|
||||
f.close()
|
||||
return s
|
||||
|
||||
def write_prompt(prt):
|
||||
with open('/tmp/list_prompt', 'wb') as f:
|
||||
pickle.dump(prt,f)
|
||||
|
||||
|
||||
class blctl_error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class wrapper_blctl:
|
||||
|
||||
def __init__(self):
|
||||
self.blctl_session = pexpect.spawn("bluetoothctl", echo = False, maxread = 3000)
|
||||
|
||||
#no prompt expected because a BT device can be connected automatically
|
||||
prompt_expect = self.blctl_session.expect([pexpect.EOF, pexpect.TIMEOUT], timeout = 1)
|
||||
str_out = str(self.blctl_session.before,"utf-8")
|
||||
l_out = str_out.split("\r\n")
|
||||
|
||||
self.prompt = read_prompt()
|
||||
if self.prompt == None:
|
||||
self.prompt= ["\[bluetooth\]", pexpect.EOF]
|
||||
#print(self.prompt)
|
||||
|
||||
#execute a bluetoothctl command and return the result as a list of lines
|
||||
#no status cmd expected
|
||||
def blctl_command(self, command, pause = 0):
|
||||
#print("blctl_command : " + command)
|
||||
self.blctl_session.send(command + "\n")
|
||||
time.sleep(pause)
|
||||
|
||||
prompt_expect = self.blctl_session.expect(self.prompt)
|
||||
|
||||
if (prompt_expect > (len(self.prompt) - 1) or (prompt_expect < 0)):
|
||||
raise blctl_error("The bluetoothctl command " + command + " failed")
|
||||
|
||||
str_output = str(self.blctl_session.before,"utf-8")
|
||||
output_array = str_output.split("\r\n")
|
||||
|
||||
return output_array
|
||||
|
||||
#execute a bluetoothctl command with status expected
|
||||
def blctl_command_with_status(self, command, status_expected, pause = 0):
|
||||
print("blctl_command_with_status : " + command + "\n")
|
||||
status = status_expected
|
||||
status.extend([pexpect.EOF])
|
||||
#print("prompt_status : %s\n", status)
|
||||
|
||||
self.blctl_session.send(command + "\n")
|
||||
time.sleep(pause)
|
||||
|
||||
res = self.blctl_session.expect(status)
|
||||
|
||||
return res
|
||||
|
||||
def close(self):
|
||||
write_prompt(self.prompt)
|
||||
self.blctl_session.close()
|
||||
|
||||
#build the list of bluetoothctl prompts
|
||||
def set_prompt(self, prompt):
|
||||
prpt = "\["+prompt+"\]"
|
||||
if prpt not in self.prompt:
|
||||
self.prompt.insert(0, prpt)
|
||||
|
||||
#bluetoothctl command : scan on
|
||||
def blctl_scan_on(self):
|
||||
try:
|
||||
cmd_res = self.blctl_command("scan on")
|
||||
except blctl_error as ex:
|
||||
print(ex)
|
||||
return None
|
||||
|
||||
#bluetoothctl command : scan off
|
||||
def blctl_scan_off(self):
|
||||
try:
|
||||
cmd_res = self.blctl_command("scan off")
|
||||
except blctl_error as ex:
|
||||
print(ex)
|
||||
return None
|
||||
|
||||
#make a dic (mac_address, name) from result of bluetoothctl command devices
|
||||
def parse_info(self, device_info):
|
||||
dev = {}
|
||||
info_isnot_valid = None
|
||||
for reg in re_device_notvalid:
|
||||
info_isnot_valid = reg.search(device_info)
|
||||
if info_isnot_valid is not None:
|
||||
break
|
||||
|
||||
if info_isnot_valid is None:
|
||||
result = device.search(device_info)
|
||||
if result is not None:
|
||||
dev = result.groupdict()
|
||||
name_tmp = dev['name'].strip()
|
||||
dev['name'] = name_tmp
|
||||
return dev
|
||||
|
||||
#bluetoothctl command : devices
|
||||
#return a list of dic {mac_address, name}
|
||||
def blctl_devices(self):
|
||||
try:
|
||||
cmd_res = self.blctl_command("devices")
|
||||
except blctl_error as ex:
|
||||
print(ex)
|
||||
return None
|
||||
else:
|
||||
list_devices = []
|
||||
for line in cmd_res:
|
||||
device = self.parse_info(line)
|
||||
if device:
|
||||
list_devices.append(device)
|
||||
|
||||
return list_devices
|
||||
|
||||
#bluetoothctl command : paired-devices
|
||||
#return a list of dic {mac_address, name}
|
||||
def blctl_paired_devices(self):
|
||||
try:
|
||||
cmd_res = self.blctl_command("paired-devices")
|
||||
except blctl_error as ex:
|
||||
print(ex)
|
||||
return None
|
||||
else:
|
||||
list_devices = []
|
||||
for line in cmd_res:
|
||||
device = self.parse_info(line)
|
||||
if device:
|
||||
list_devices.append(device)
|
||||
|
||||
return list_devices
|
||||
|
||||
#bluetoothctl command : info <mac_address>
|
||||
def blctl_info(self, mac_address):
|
||||
try:
|
||||
cmd_res = self.blctl_command("info " + mac_address)
|
||||
except blctl_error as ex:
|
||||
print(ex)
|
||||
return None
|
||||
else:
|
||||
return cmd_res
|
||||
|
||||
#bluetoothctl command : pair <mac_address>
|
||||
def blctl_pair(self, mac_address):
|
||||
cmd_res = self.blctl_command_with_status("pair " + mac_address, ["confirmation", "Pairing successful", "not available", "Failed to pair"], pause=4)
|
||||
return cmd_res
|
||||
|
||||
#bluetoothctl command : connect <mac_address>
|
||||
def blctl_connect(self, mac_address):
|
||||
cmd_res = self.blctl_command_with_status("connect " + mac_address, ["Failed to connect", "Connection successful"], pause=2)
|
||||
passed = True if cmd_res == 1 else False
|
||||
return passed
|
||||
|
||||
#bluetoothctl command : disconnect <mac_address>
|
||||
def blctl_disconnect(self, mac_address):
|
||||
cmd_res = self.blctl_command_with_status("disconnect " + mac_address, ["Failed to disconnect", "Successful disconnected"], pause=2)
|
||||
passed = True if cmd_res == 1 else False
|
||||
return passed
|
||||
|
||||
#bluetoothctl command : remove <mac_address>
|
||||
def blctl_remove(self, mac_address):
|
||||
cmd_res = self.blctl_command_with_status("remove " + mac_address, ["not available", "Failed to remove", "Device has been removed"], pause=3)
|
||||
passed = True if cmd_res == 2 else False
|
||||
return passed
|
||||
Reference in New Issue
Block a user