|
|
@ -1,874 +0,0 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
# encoding:utf8 |
|
|
|
"""NetworkManager command line dmenu script. |
|
|
|
|
|
|
|
To add new connections or enable/disable networking requires policykit |
|
|
|
permissions setup per: |
|
|
|
https://wiki.archlinux.org/index.php/NetworkManager#Set_up_PolicyKit_permissions |
|
|
|
|
|
|
|
OR running the script as root |
|
|
|
|
|
|
|
Add dmenu formatting options and default terminal if desired to |
|
|
|
~/.config/networkmanager-dmenu/config.ini |
|
|
|
|
|
|
|
""" |
|
|
|
import pathlib |
|
|
|
import struct |
|
|
|
import configparser |
|
|
|
import itertools |
|
|
|
import locale |
|
|
|
import os |
|
|
|
from os.path import expanduser |
|
|
|
import shlex |
|
|
|
import sys |
|
|
|
import uuid |
|
|
|
from subprocess import Popen, PIPE |
|
|
|
|
|
|
|
import gi |
|
|
|
gi.require_version('NM', '1.0') |
|
|
|
from gi.repository import GLib, NM # pylint: disable=wrong-import-position |
|
|
|
|
|
|
|
ENV = os.environ.copy() |
|
|
|
ENV['LC_ALL'] = 'C' |
|
|
|
ENC = locale.getpreferredencoding() |
|
|
|
|
|
|
|
CLIENT = NM.Client.new(None) |
|
|
|
LOOP = GLib.MainLoop() |
|
|
|
CONNS = CLIENT.get_connections() |
|
|
|
|
|
|
|
CONF = configparser.ConfigParser() |
|
|
|
CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini")) |
|
|
|
|
|
|
|
def dmenu_cmd(num_lines, prompt="Networks", active_lines=None): # pylint: disable=too-many-branches |
|
|
|
"""Parse config.ini if it exists and add options to the dmenu command |
|
|
|
|
|
|
|
Args: args - num_lines: number of lines to display |
|
|
|
prompt: prompt to show |
|
|
|
Returns: command invocation (as a list of strings) for |
|
|
|
dmenu -l <num_lines> -p <prompt> -i ... |
|
|
|
|
|
|
|
""" |
|
|
|
dmenu_command = "dmenu" |
|
|
|
if not CONF.sections(): |
|
|
|
res = [dmenu_command, "-z", "1900", "-x", "10", "-y", "10", "-i", "-l", str(num_lines), "-p", str(prompt)] |
|
|
|
res.extend(sys.argv[1:]) |
|
|
|
return res |
|
|
|
if CONF.has_section('dmenu'): |
|
|
|
args = CONF.items('dmenu') |
|
|
|
args_dict = dict(args) |
|
|
|
dmenu_args = [] |
|
|
|
if "dmenu_command" in args_dict: |
|
|
|
command = shlex.split(args_dict["dmenu_command"]) |
|
|
|
dmenu_command = command[0] |
|
|
|
dmenu_args = command[1:] |
|
|
|
del args_dict["dmenu_command"] |
|
|
|
if "p" in args_dict and prompt == "Networks": |
|
|
|
prompt = args_dict["p"] |
|
|
|
del args_dict["p"] |
|
|
|
elif "p" in args_dict: |
|
|
|
del args_dict["p"] |
|
|
|
if "rofi" in dmenu_command: |
|
|
|
lines = "-i -dmenu -lines" |
|
|
|
# rofi doesn't support 0 length line, it requires at least -lines=1 |
|
|
|
# see https://github.com/DaveDavenport/rofi/issues/252 |
|
|
|
num_lines = num_lines or 1 |
|
|
|
else: |
|
|
|
lines = "-i -l" |
|
|
|
if "l" in args_dict: |
|
|
|
# rofi doesn't support 0 length line, it requires at least -lines=1 |
|
|
|
# see https://github.com/DaveDavenport/rofi/issues/252 |
|
|
|
if "rofi" in dmenu_command: |
|
|
|
args_dict['l'] = min(num_lines, int(args_dict['l'])) or 1 |
|
|
|
lines = "{} {}".format(lines, args_dict['l']) |
|
|
|
del args_dict['l'] |
|
|
|
else: |
|
|
|
lines = "{} {}".format(lines, num_lines) |
|
|
|
if "pinentry" in args_dict: |
|
|
|
del args_dict["pinentry"] |
|
|
|
if "compact" in args_dict: |
|
|
|
del args_dict["compact"] |
|
|
|
if "wifi_chars" in args_dict: |
|
|
|
del args_dict["wifi_chars"] |
|
|
|
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) |
|
|
|
if CONF.has_option('dmenu', 'rofi_highlight'): |
|
|
|
del args_dict["rofi_highlight"] |
|
|
|
if rofi_highlight is True and "rofi" in dmenu_command: |
|
|
|
if active_lines: |
|
|
|
dmenu_args.extend(["-a", ",".join([str(num) |
|
|
|
for num in active_lines])]) |
|
|
|
if prompt == "Passphrase": |
|
|
|
if CONF.has_section('dmenu_passphrase'): |
|
|
|
args = CONF.items('dmenu_passphrase') |
|
|
|
args_dict.update(args) |
|
|
|
rofi_obscure = CONF.getboolean('dmenu_passphrase', 'rofi_obscure', fallback=True) |
|
|
|
if CONF.has_option('dmenu_passphrase', 'rofi_obscure'): |
|
|
|
del args_dict["rofi_obscure"] |
|
|
|
if rofi_obscure is True and "rofi" in dmenu_command: |
|
|
|
dmenu_args.extend(["-password"]) |
|
|
|
dmenu_password = CONF.getboolean('dmenu_passphrase', 'dmenu_password', fallback=False) |
|
|
|
if CONF.has_option('dmenu_passphrase', 'dmenu_password'): |
|
|
|
del args_dict["dmenu_password"] |
|
|
|
if dmenu_password is True: |
|
|
|
dmenu_args.extend(["-P"]) |
|
|
|
extras = (["-" + str(k), str(v)] for (k, v) in args_dict.items()) |
|
|
|
res = [dmenu_command, "-p", str(prompt)] |
|
|
|
res.extend(dmenu_args) |
|
|
|
res += list(itertools.chain.from_iterable(extras)) |
|
|
|
res[1:1] = lines.split() |
|
|
|
res = list(filter(None, res)) # Remove empty list elements |
|
|
|
res.extend(sys.argv[1:]) |
|
|
|
return res |
|
|
|
|
|
|
|
|
|
|
|
def choose_adapter(client): |
|
|
|
"""If there is more than one wifi adapter installed, ask which one to use |
|
|
|
|
|
|
|
""" |
|
|
|
devices = client.get_devices() |
|
|
|
devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI] |
|
|
|
if not devices: # pylint: disable=no-else-return |
|
|
|
return None |
|
|
|
elif len(devices) == 1: |
|
|
|
return devices[0] |
|
|
|
device_names = "\n".join([d.get_iface() for d in devices]).encode(ENC) |
|
|
|
sel = Popen(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"), |
|
|
|
stdin=PIPE, |
|
|
|
stdout=PIPE, |
|
|
|
env=ENV).communicate(input=device_names)[0].decode(ENC) |
|
|
|
if not sel.strip(): |
|
|
|
sys.exit() |
|
|
|
devices = [i for i in devices if i.get_iface() == sel.strip()] |
|
|
|
assert len(devices) == 1 |
|
|
|
return devices[0] |
|
|
|
|
|
|
|
|
|
|
|
def is_modemmanager_installed(): |
|
|
|
"""Check if ModemManager is installed""" |
|
|
|
with open(os.devnull) as devnull: |
|
|
|
try: |
|
|
|
Popen(["ModemManager"], stdout=devnull, stderr=devnull).communicate() |
|
|
|
except OSError: |
|
|
|
return False |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def bluetooth_get_enabled(): |
|
|
|
"""Check if bluetooth is enabled via rfkill. |
|
|
|
|
|
|
|
Returns None if no bluetooth device was found. |
|
|
|
""" |
|
|
|
# See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill |
|
|
|
for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'): |
|
|
|
if (path / 'type').read_text().strip() == 'bluetooth': |
|
|
|
return (path / 'soft').read_text().strip() == '0' |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def create_other_actions(client): |
|
|
|
"""Return list of other actions that can be taken |
|
|
|
|
|
|
|
""" |
|
|
|
networking_enabled = client.networking_get_enabled() |
|
|
|
networking_action = "Disable" if networking_enabled else "Enable" |
|
|
|
|
|
|
|
wifi_enabled = client.wireless_get_enabled() |
|
|
|
wifi_action = "Disable" if wifi_enabled else "Enable" |
|
|
|
|
|
|
|
bluetooth_enabled = bluetooth_get_enabled() |
|
|
|
bluetooth_action = "Disable" if bluetooth_enabled else "Enable" |
|
|
|
|
|
|
|
actions = [Action("{} Wifi".format(wifi_action), toggle_wifi, |
|
|
|
not wifi_enabled), |
|
|
|
Action("{} Networking".format(networking_action), |
|
|
|
toggle_networking, not networking_enabled)] |
|
|
|
if bluetooth_enabled is not None: |
|
|
|
actions.append(Action("{} Bluetooth".format(bluetooth_action), |
|
|
|
toggle_bluetooth, not bluetooth_enabled)) |
|
|
|
actions += [Action("Launch Connection Manager", launch_connection_editor), |
|
|
|
Action("Delete a Connection", delete_connection)] |
|
|
|
if wifi_enabled: |
|
|
|
actions.append(Action("Rescan Wifi Networks", rescan_wifi)) |
|
|
|
return actions |
|
|
|
|
|
|
|
|
|
|
|
def rescan_wifi(): |
|
|
|
""" |
|
|
|
Rescan Wifi Access Points |
|
|
|
""" |
|
|
|
for dev in CLIENT.get_devices(): |
|
|
|
if gi.repository.NM.DeviceWifi == type(dev): |
|
|
|
try: |
|
|
|
dev.request_scan_async(None, rescan_cb, None) |
|
|
|
LOOP.run() |
|
|
|
except gi.repository.GLib.Error as err: |
|
|
|
# Too frequent rescan error |
|
|
|
notify("Wifi rescan failed", urgency="critical") |
|
|
|
if not err.code == 6: # pylint: disable=no-member |
|
|
|
raise err |
|
|
|
|
|
|
|
|
|
|
|
def rescan_cb(dev, res, data): |
|
|
|
"""Callback for rescan_wifi. Just for notifications |
|
|
|
|
|
|
|
""" |
|
|
|
if dev.request_scan_finish(res) is True: |
|
|
|
notify("Wifi scan complete") |
|
|
|
else: |
|
|
|
notify("Wifi scan failed", urgency="critical") |
|
|
|
LOOP.quit() |
|
|
|
|
|
|
|
|
|
|
|
def ssid_to_utf8(nm_ap): |
|
|
|
""" Convert binary ssid to utf-8 """ |
|
|
|
ssid = nm_ap.get_ssid() |
|
|
|
if not ssid: |
|
|
|
return "" |
|
|
|
ret = NM.utils_ssid_to_utf8(ssid.get_data()) |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
def prompt_saved(saved_cons): |
|
|
|
"""Prompt for a saved connection.""" |
|
|
|
actions = create_saved_actions(saved_cons) |
|
|
|
sel = get_selection(actions) |
|
|
|
sel() |
|
|
|
|
|
|
|
|
|
|
|
def ap_security(nm_ap): |
|
|
|
"""Parse the security flags to return a string with 'WPA2', etc. """ |
|
|
|
flags = nm_ap.get_flags() |
|
|
|
wpa_flags = nm_ap.get_wpa_flags() |
|
|
|
rsn_flags = nm_ap.get_rsn_flags() |
|
|
|
sec_str = "" |
|
|
|
if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and |
|
|
|
(wpa_flags == 0) and (rsn_flags == 0)): |
|
|
|
sec_str += " WEP" |
|
|
|
if wpa_flags != 0: |
|
|
|
sec_str += " WPA1" |
|
|
|
if rsn_flags != 0: |
|
|
|
sec_str += " WPA2" |
|
|
|
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or |
|
|
|
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)): |
|
|
|
sec_str += " 802.1X" |
|
|
|
|
|
|
|
# If there is no security use "--" |
|
|
|
if sec_str == "": |
|
|
|
sec_str = "--" |
|
|
|
return sec_str.lstrip() |
|
|
|
|
|
|
|
|
|
|
|
class Action(): # pylint: disable=too-few-public-methods |
|
|
|
"""Helper class to execute functions from a string variable""" |
|
|
|
def __init__(self, |
|
|
|
name, |
|
|
|
func, |
|
|
|
args=None, |
|
|
|
active=False): |
|
|
|
self.name = name |
|
|
|
self.func = func |
|
|
|
self.is_active = active |
|
|
|
if args is None: |
|
|
|
self.args = None |
|
|
|
elif isinstance(args, list): |
|
|
|
self.args = args |
|
|
|
else: |
|
|
|
self.args = [args] |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return self.name |
|
|
|
|
|
|
|
def __call__(self): |
|
|
|
if self.args is None: |
|
|
|
self.func() |
|
|
|
else: |
|
|
|
self.func(*self.args) |
|
|
|
|
|
|
|
|
|
|
|
def process_ap(nm_ap, is_active, adapter): |
|
|
|
"""Activate/Deactivate a connection and get password if required""" |
|
|
|
if is_active: |
|
|
|
CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap) |
|
|
|
else: |
|
|
|
conns_cur = [i for i in CONNS if |
|
|
|
i.get_setting_wireless() is not None and |
|
|
|
i.get_setting_wireless().get_mac_address() == |
|
|
|
adapter.get_permanent_hw_address()] |
|
|
|
con = nm_ap.filter_connections(conns_cur) |
|
|
|
if len(con) > 1: |
|
|
|
raise ValueError("There are multiple connections possible") |
|
|
|
|
|
|
|
if len(con) == 1: |
|
|
|
CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(), |
|
|
|
None, activate_cb, nm_ap) |
|
|
|
else: |
|
|
|
if ap_security(nm_ap) != "--": |
|
|
|
password = get_passphrase() |
|
|
|
else: |
|
|
|
password = "" |
|
|
|
set_new_connection(nm_ap, password, adapter) |
|
|
|
LOOP.run() |
|
|
|
|
|
|
|
|
|
|
|
def activate_cb(dev, res, data): |
|
|
|
"""Notification if activate connection completed successfully |
|
|
|
|
|
|
|
""" |
|
|
|
try: |
|
|
|
conn = dev.activate_connection_finish(res) |
|
|
|
except GLib.Error: |
|
|
|
conn = None |
|
|
|
if conn is not None: |
|
|
|
notify("Activated {}".format(conn.get_id())) |
|
|
|
else: |
|
|
|
notify("Problem activating {}".format(data.get_id()), |
|
|
|
urgency="critical") |
|
|
|
LOOP.quit() |
|
|
|
|
|
|
|
|
|
|
|
def deactivate_cb(dev, res, data): |
|
|
|
"""Notification if deactivate connection completed successfully |
|
|
|
|
|
|
|
""" |
|
|
|
if dev.deactivate_connection_finish(res) is True: |
|
|
|
notify("Deactivated {}".format(data.get_id())) |
|
|
|
else: |
|
|
|
notify("Problem deactivating {}".format(data.get_id()), |
|
|
|
urgency="critical") |
|
|
|
LOOP.quit() |
|
|
|
|
|
|
|
|
|
|
|
def process_vpngsm(con, activate): |
|
|
|
"""Activate/deactive VPN or GSM connections""" |
|
|
|
if activate: |
|
|
|
CLIENT.activate_connection_async(con, None, None, |
|
|
|
None, activate_cb, con) |
|
|
|
else: |
|
|
|
CLIENT.deactivate_connection_async(con, None, deactivate_cb, con) |
|
|
|
LOOP.run() |
|
|
|
|
|
|
|
|
|
|
|
def create_ap_actions(aps, active_ap, active_connection, adapter): # pylint: disable=too-many-locals |
|
|
|
"""For each AP in a list, create the string and its attached function |
|
|
|
(activate/deactivate) |
|
|
|
|
|
|
|
""" |
|
|
|
active_ap_bssid = active_ap.get_bssid() if active_ap is not None else "" |
|
|
|
|
|
|
|
names = [ssid_to_utf8(ap) for ap in aps] |
|
|
|
max_len_name = max([len(name) for name in names]) if names else 0 |
|
|
|
secs = [ap_security(ap) for ap in aps] |
|
|
|
max_len_sec = max([len(sec) for sec in secs]) if secs else 0 |
|
|
|
|
|
|
|
ap_actions = [] |
|
|
|
|
|
|
|
for nm_ap, name, sec in zip(aps, names, secs): |
|
|
|
bars = NM.utils_wifi_strength_bars(nm_ap.get_strength()) |
|
|
|
wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False) |
|
|
|
if wifi_chars: |
|
|
|
bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*']) |
|
|
|
is_active = nm_ap.get_bssid() == active_ap_bssid |
|
|
|
compact = CONF.getboolean("dmenu", "compact", fallback=False) |
|
|
|
if compact: |
|
|
|
action_name = u"{} {} {}".format(name, sec, bars) |
|
|
|
else: |
|
|
|
action_name = u"{:<{}s} {:<{}s} {}".format(name, max_len_name, sec, |
|
|
|
max_len_sec, bars) |
|
|
|
if is_active: |
|
|
|
ap_actions.append(Action(action_name, process_ap, |
|
|
|
[active_connection, True, adapter], |
|
|
|
active=True)) |
|
|
|
else: |
|
|
|
ap_actions.append(Action(action_name, process_ap, |
|
|
|
[nm_ap, False, adapter])) |
|
|
|
return ap_actions |
|
|
|
|
|
|
|
|
|
|
|
def create_vpn_actions(vpns, active): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) for VPN connections. |
|
|
|
|
|
|
|
""" |
|
|
|
active_vpns = [i for i in active if i.get_vpn()] |
|
|
|
return _create_vpngsm_actions(vpns, active_vpns, "VPN") |
|
|
|
|
|
|
|
|
|
|
|
def create_wireguard_actions(wgs, active): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) for Wireguard connections. |
|
|
|
|
|
|
|
""" |
|
|
|
active_wgs = [i for i in active if i.get_connection_type() == "wireguard"] |
|
|
|
return _create_vpngsm_actions(wgs, active_wgs, "Wireguard") |
|
|
|
|
|
|
|
|
|
|
|
def create_eth_actions(eths, active): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) for Ethernet connections. |
|
|
|
|
|
|
|
""" |
|
|
|
active_eths = [i for i in active if 'ethernet' in i.get_connection_type()] |
|
|
|
return _create_vpngsm_actions(eths, active_eths, "Eth") |
|
|
|
|
|
|
|
|
|
|
|
def create_gsm_actions(gsms, active): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) GSM connections.""" |
|
|
|
active_gsms = [i for i in active if |
|
|
|
i.get_connection() is not None and |
|
|
|
i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)] |
|
|
|
return _create_vpngsm_actions(gsms, active_gsms, "GSM") |
|
|
|
|
|
|
|
|
|
|
|
def create_blue_actions(blues, active): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) Bluetooth connections.""" |
|
|
|
active_blues = [i for i in active if |
|
|
|
i.get_connection() is not None and |
|
|
|
i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] |
|
|
|
return _create_vpngsm_actions(blues, active_blues, "Bluetooth") |
|
|
|
|
|
|
|
|
|
|
|
def create_saved_actions(saved): |
|
|
|
"""Create the list of strings to display with associated function |
|
|
|
(activate/deactivate) for VPN connections. |
|
|
|
|
|
|
|
""" |
|
|
|
return _create_vpngsm_actions(saved, [], "SAVED") |
|
|
|
|
|
|
|
|
|
|
|
def _create_vpngsm_actions(cons, active_cons, label): |
|
|
|
active_con_ids = [a.get_id() for a in active_cons] |
|
|
|
actions = [] |
|
|
|
for con in cons: |
|
|
|
is_active = con.get_id() in active_con_ids |
|
|
|
action_name = u"{}:{}".format(con.get_id(), label) |
|
|
|
if is_active: |
|
|
|
active_connection = [a for a in active_cons |
|
|
|
if a.get_id() == con.get_id()] |
|
|
|
if len(active_connection) != 1: |
|
|
|
raise ValueError(u"Multiple active connections match" |
|
|
|
" the connection: {}".format(con.get_id())) |
|
|
|
active_connection = active_connection[0] |
|
|
|
|
|
|
|
actions.append(Action(action_name, process_vpngsm, |
|
|
|
[active_connection, False], active=True)) |
|
|
|
else: |
|
|
|
actions.append(Action(action_name, process_vpngsm, |
|
|
|
[con, True])) |
|
|
|
return actions |
|
|
|
|
|
|
|
|
|
|
|
def create_wwan_actions(client): |
|
|
|
"""Create WWWAN actions |
|
|
|
|
|
|
|
""" |
|
|
|
wwan_enabled = client.wwan_get_enabled() |
|
|
|
wwan_action = "Disable" if wwan_enabled else "Enable" |
|
|
|
return [Action("{} WWAN".format(wwan_action), toggle_wwan, not wwan_enabled)] |
|
|
|
|
|
|
|
|
|
|
|
def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): |
|
|
|
"""Combine all given actions into a list of actions. |
|
|
|
|
|
|
|
Args: args - eths: list of Actions |
|
|
|
aps: list of Actions |
|
|
|
vpns: list of Actions |
|
|
|
gsms: list of Actions |
|
|
|
blues: list of Actions |
|
|
|
wwan: list of Actions |
|
|
|
others: list of Actions |
|
|
|
""" |
|
|
|
compact = CONF.getboolean("dmenu", "compact", fallback=False) |
|
|
|
empty_action = [Action('', None)] if not compact else [] |
|
|
|
all_actions = [] |
|
|
|
all_actions += eths + empty_action if eths else [] |
|
|
|
all_actions += aps + empty_action if aps else [] |
|
|
|
all_actions += vpns + empty_action if vpns else [] |
|
|
|
all_actions += wgs + empty_action if wgs else [] |
|
|
|
all_actions += gsms + empty_action if (gsms and wwan) else [] |
|
|
|
all_actions += blues + empty_action if blues else [] |
|
|
|
all_actions += wwan + empty_action if wwan else [] |
|
|
|
all_actions += others + empty_action if others else [] |
|
|
|
all_actions += saved + empty_action if saved else [] |
|
|
|
return all_actions |
|
|
|
|
|
|
|
|
|
|
|
def get_selection(all_actions): |
|
|
|
"""Spawn dmenu for selection and execute the associated action.""" |
|
|
|
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) |
|
|
|
inp = [] |
|
|
|
|
|
|
|
if rofi_highlight is True: |
|
|
|
inp = [str(action) for action in all_actions] |
|
|
|
else: |
|
|
|
inp = [('== ' if action.is_active else ' ') + str(action) |
|
|
|
for action in all_actions] |
|
|
|
active_lines = [index for index, action in enumerate(all_actions) |
|
|
|
if action.is_active] |
|
|
|
|
|
|
|
inp_bytes = "\n".join(inp).encode(ENC) |
|
|
|
command = dmenu_cmd(len(inp), active_lines=active_lines) |
|
|
|
sel = Popen(command, stdin=PIPE, stdout=PIPE, |
|
|
|
env=ENV).communicate(input=inp_bytes)[0].decode(ENC) |
|
|
|
|
|
|
|
if not sel.rstrip(): |
|
|
|
sys.exit() |
|
|
|
|
|
|
|
if rofi_highlight is False: |
|
|
|
action = [i for i in all_actions |
|
|
|
if ((str(i).strip() == str(sel.strip()) |
|
|
|
and not i.is_active) or |
|
|
|
('== ' + str(i) == str(sel.rstrip('\n')) |
|
|
|
and i.is_active))] |
|
|
|
else: |
|
|
|
action = [i for i in all_actions if str(i).strip() == sel.strip()] |
|
|
|
assert len(action) == 1, \ |
|
|
|
u"Selection was ambiguous: '{}'".format(str(sel.strip())) |
|
|
|
return action[0] |
|
|
|
|
|
|
|
|
|
|
|
def toggle_networking(enable): |
|
|
|
"""Enable/disable networking |
|
|
|
|
|
|
|
Args: enable - boolean |
|
|
|
|
|
|
|
""" |
|
|
|
toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable)) |
|
|
|
try: |
|
|
|
CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle, |
|
|
|
None, -1, None, None, None) |
|
|
|
except AttributeError: |
|
|
|
# Workaround for older versions of python-gobject |
|
|
|
CLIENT.networking_set_enabled(enable) |
|
|
|
notify("Networking {}".format("enabled" if enable is True else "disabled")) |
|
|
|
|
|
|
|
|
|
|
|
def toggle_wifi(enable): |
|
|
|
"""Enable/disable Wifi |
|
|
|
|
|
|
|
Args: enable - boolean |
|
|
|
|
|
|
|
""" |
|
|
|
toggle = GLib.Variant.new_boolean(enable) |
|
|
|
try: |
|
|
|
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle, |
|
|
|
-1, None, None, None) |
|
|
|
except AttributeError: |
|
|
|
# Workaround for older versions of python-gobject |
|
|
|
CLIENT.wireless_set_enabled(enable) |
|
|
|
notify("Wifi {}".format("enabled" if enable is True else "disabled")) |
|
|
|
|
|
|
|
|
|
|
|
def toggle_wwan(enable): |
|
|
|
"""Enable/disable WWAN |
|
|
|
|
|
|
|
Args: enable - boolean |
|
|
|
|
|
|
|
""" |
|
|
|
toggle = GLib.Variant.new_boolean(enable) |
|
|
|
try: |
|
|
|
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, |
|
|
|
-1, None, None, None) |
|
|
|
except AttributeError: |
|
|
|
# Workaround for older versions of python-gobject |
|
|
|
CLIENT.wwan_set_enabled(enable) |
|
|
|
notify("Wwan {}".format("enabled" if enable is True else "disabled")) |
|
|
|
|
|
|
|
|
|
|
|
def toggle_bluetooth(enable): |
|
|
|
"""Enable/disable Bluetooth |
|
|
|
|
|
|
|
Args: enable - boolean |
|
|
|
|
|
|
|
References: |
|
|
|
https://github.com/blueman-project/blueman/blob/master/blueman/plugins/mechanism/RfKill.py |
|
|
|
https://www.kernel.org/doc/html/latest/driver-api/rfkill.html |
|
|
|
https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9 |
|
|
|
|
|
|
|
""" |
|
|
|
type_bluetooth = 2 |
|
|
|
op_change_all = 3 |
|
|
|
idx = 0 |
|
|
|
soft_state = 0 if enable else 1 |
|
|
|
hard_state = 0 |
|
|
|
|
|
|
|
data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all, |
|
|
|
soft_state, hard_state) |
|
|
|
|
|
|
|
try: |
|
|
|
with open('/dev/rfkill', 'r+b', buffering=0) as rff: |
|
|
|
rff.write(data) |
|
|
|
except PermissionError: |
|
|
|
notify("Lacking permission to write to /dev/rfkill.", |
|
|
|
"Maybe you need to add your user to the 'rfkill' group?", |
|
|
|
urgency="critical") |
|
|
|
else: |
|
|
|
notify("Bluetooth {}".format("enabled" if enable else "disabled")) |
|
|
|
|
|
|
|
|
|
|
|
def launch_connection_editor(): |
|
|
|
"""Launch nmtui or the gui nm-connection-editor |
|
|
|
|
|
|
|
""" |
|
|
|
terminal = CONF.get("editor", "terminal", fallback="xterm") |
|
|
|
gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True) |
|
|
|
if gui_if_available is True: |
|
|
|
try: |
|
|
|
Popen(["gnome-control-center", "network"]).communicate() |
|
|
|
except OSError: |
|
|
|
try: |
|
|
|
Popen(["nm-connection-editor"]).communicate() |
|
|
|
except OSError: |
|
|
|
Popen([terminal, "-e", "nmtui"]).communicate() |
|
|
|
else: |
|
|
|
Popen([terminal, "-e", "nmtui"]).communicate() |
|
|
|
|
|
|
|
|
|
|
|
def get_passphrase(): |
|
|
|
"""Get a password |
|
|
|
|
|
|
|
Returns: string |
|
|
|
|
|
|
|
""" |
|
|
|
pinentry = CONF.get("dmenu", "pinentry", fallback=None) |
|
|
|
if pinentry: |
|
|
|
pin = "" |
|
|
|
out = Popen(pinentry, |
|
|
|
stdout=PIPE, |
|
|
|
stdin=PIPE).communicate(input=b'setdesc Get network password\ngetpin\n')[0] |
|
|
|
if out: |
|
|
|
res = out.decode(ENC).split("\n")[2] |
|
|
|
if res.startswith("D "): |
|
|
|
pin = res.split("D ")[1] |
|
|
|
return pin |
|
|
|
return Popen(dmenu_cmd(0, "Passphrase"), |
|
|
|
stdin=PIPE, stdout=PIPE).communicate()[0].decode(ENC) |
|
|
|
|
|
|
|
|
|
|
|
def delete_connection(): |
|
|
|
"""Display list of NM connections and delete the selected one |
|
|
|
|
|
|
|
""" |
|
|
|
conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS] |
|
|
|
conn_names = "\n".join([str(i) for i in conn_acts]).encode(ENC) |
|
|
|
sel = Popen(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"), |
|
|
|
stdin=PIPE, |
|
|
|
stdout=PIPE, |
|
|
|
env=ENV).communicate(input=conn_names)[0].decode(ENC) |
|
|
|
if not sel.strip(): |
|
|
|
sys.exit() |
|
|
|
action = [i for i in conn_acts if str(i) == sel.rstrip("\n")] |
|
|
|
assert len(action) == 1, u"Selection was ambiguous: {}".format(str(sel)) |
|
|
|
action[0]() |
|
|
|
LOOP.run() |
|
|
|
|
|
|
|
|
|
|
|
def delete_cb(dev, res, data): |
|
|
|
"""Notification if delete completed successfully |
|
|
|
|
|
|
|
""" |
|
|
|
if dev.delete_finish(res) is True: |
|
|
|
notify("Deleted {}".format(dev.get_id())) |
|
|
|
else: |
|
|
|
notify("Problem deleting {}".format(dev.get_id()), urgency="critical") |
|
|
|
LOOP.quit() |
|
|
|
|
|
|
|
|
|
|
|
def set_new_connection(nm_ap, nm_pw, adapter): |
|
|
|
"""Setup a new NetworkManager connection |
|
|
|
|
|
|
|
Args: ap - NM.AccessPoint |
|
|
|
pw - string |
|
|
|
|
|
|
|
""" |
|
|
|
nm_pw = str(nm_pw).strip() |
|
|
|
profile = create_wifi_profile(nm_ap, nm_pw, adapter) |
|
|
|
CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(), |
|
|
|
None, verify_conn, profile) |
|
|
|
LOOP.run() |
|
|
|
|
|
|
|
|
|
|
|
def create_wifi_profile(nm_ap, password, adapter): |
|
|
|
# pylint: disable=C0301 |
|
|
|
# From https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/gi/add_connection.py |
|
|
|
# and https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/dbus/add-wifi-psk-connection.py |
|
|
|
# pylint: enable=C0301 |
|
|
|
"""Create the NM profile given the AP and passphrase""" |
|
|
|
ap_sec = ap_security(nm_ap) |
|
|
|
profile = NM.SimpleConnection.new() |
|
|
|
|
|
|
|
s_con = NM.SettingConnection.new() |
|
|
|
s_con.set_property(NM.SETTING_CONNECTION_ID, ssid_to_utf8(nm_ap)) |
|
|
|
s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4())) |
|
|
|
s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless") |
|
|
|
profile.add_setting(s_con) |
|
|
|
|
|
|
|
s_wifi = NM.SettingWireless.new() |
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid()) |
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure') |
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()) |
|
|
|
profile.add_setting(s_wifi) |
|
|
|
|
|
|
|
s_ip4 = NM.SettingIP4Config.new() |
|
|
|
s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto") |
|
|
|
profile.add_setting(s_ip4) |
|
|
|
|
|
|
|
s_ip6 = NM.SettingIP6Config.new() |
|
|
|
s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto") |
|
|
|
profile.add_setting(s_ip6) |
|
|
|
|
|
|
|
if ap_sec != "--": |
|
|
|
s_wifi_sec = NM.SettingWirelessSecurity.new() |
|
|
|
if "WPA" in ap_sec: |
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, |
|
|
|
"wpa-psk") |
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, |
|
|
|
"open") |
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password) |
|
|
|
elif "WEP" in ap_sec: |
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, |
|
|
|
"None") |
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, |
|
|
|
NM.WepKeyType.PASSPHRASE) |
|
|
|
s_wifi_sec.set_wep_key(0, password) |
|
|
|
profile.add_setting(s_wifi_sec) |
|
|
|
|
|
|
|
return profile |
|
|
|
|
|
|
|
|
|
|
|
def verify_conn(client, result, data): |
|
|
|
"""Callback function for add_and_activate_connection_async |
|
|
|
|
|
|
|
Check if connection completes successfully. Delete the connection if there |
|
|
|
is an error. |
|
|
|
|
|
|
|
""" |
|
|
|
try: |
|
|
|
act_conn = client.add_and_activate_connection_finish(result) |
|
|
|
conn = act_conn.get_connection() |
|
|
|
if not all([conn.verify(), |
|
|
|
conn.verify_secrets(), |
|
|
|
data.verify(), |
|
|
|
data.verify_secrets()]): |
|
|
|
raise GLib.Error |
|
|
|
notify("Added {}".format(conn.get_id())) |
|
|
|
except GLib.Error: # pylint: disable=catching-non-exception |
|
|
|
try: |
|
|
|
notify("Connection to {} failed".format(conn.get_id()), |
|
|
|
urgency="critical") |
|
|
|
conn.delete_async(None, None, None) |
|
|
|
except UnboundLocalError: |
|
|
|
pass |
|
|
|
finally: |
|
|
|
LOOP.quit() |
|
|
|
|
|
|
|
|
|
|
|
def create_ap_list(adapter, active_connections): |
|
|
|
"""Generate list of access points. Remove duplicate APs , keeping strongest |
|
|
|
ones and the active AP |
|
|
|
|
|
|
|
Args: adapter |
|
|
|
active_connections - list of all active connections |
|
|
|
Returns: aps - list of access points |
|
|
|
active_ap - active AP |
|
|
|
active_ap_con - active Connection |
|
|
|
adapter |
|
|
|
|
|
|
|
""" |
|
|
|
aps = [] |
|
|
|
ap_names = [] |
|
|
|
active_ap = adapter.get_active_access_point() |
|
|
|
aps_all = sorted(adapter.get_access_points(), |
|
|
|
key=lambda a: a.get_strength(), reverse=True) |
|
|
|
conns_cur = [i for i in CONNS if |
|
|
|
i.get_setting_wireless() is not None and |
|
|
|
i.get_setting_wireless().get_mac_address() == |
|
|
|
adapter.get_permanent_hw_address()] |
|
|
|
try: |
|
|
|
ap_conns = active_ap.filter_connections(conns_cur) |
|
|
|
active_ap_name = ssid_to_utf8(active_ap) |
|
|
|
active_ap_con = [active_conn for active_conn in active_connections |
|
|
|
if active_conn.get_connection() in ap_conns] |
|
|
|
except AttributeError: |
|
|
|
active_ap_name = None |
|
|
|
active_ap_con = [] |
|
|
|
if len(active_ap_con) > 1: |
|
|
|
raise ValueError("Multiple connection profiles match" |
|
|
|
" the wireless AP") |
|
|
|
active_ap_con = active_ap_con[0] if active_ap_con else None |
|
|
|
for nm_ap in aps_all: |
|
|
|
ap_name = ssid_to_utf8(nm_ap) |
|
|
|
if nm_ap != active_ap and ap_name == active_ap_name: |
|
|
|
# Skip adding AP if it's not active but same name as active AP |
|
|
|
continue |
|
|
|
if ap_name not in ap_names: |
|
|
|
ap_names.append(ap_name) |
|
|
|
aps.append(nm_ap) |
|
|
|
return aps, active_ap, active_ap_con, adapter |
|
|
|
|
|
|
|
|
|
|
|
def notify(message, details=None, urgency="low"): |
|
|
|
"""Use notify-send if available for notifications |
|
|
|
|
|
|
|
""" |
|
|
|
args = ["-u", urgency, message] |
|
|
|
if details is not None: |
|
|
|
args.append(details) |
|
|
|
|
|
|
|
try: |
|
|
|
Popen(["notify-send"] + args, |
|
|
|
stdout=PIPE, stderr=PIPE).communicate() |
|
|
|
except FileNotFoundError: |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def run(): |
|
|
|
"""Main script entrypoint""" |
|
|
|
active = CLIENT.get_active_connections() |
|
|
|
adapter = choose_adapter(CLIENT) |
|
|
|
if adapter: |
|
|
|
ap_actions = create_ap_actions(*create_ap_list(adapter, active)) |
|
|
|
else: |
|
|
|
ap_actions = [] |
|
|
|
|
|
|
|
vpns = [i for i in CONNS if i.is_type(NM.SETTING_VPN_SETTING_NAME)] |
|
|
|
try: |
|
|
|
wgs = [i for i in CONNS if i.is_type(NM.SETTING_WIREGUARD_SETTING_NAME)] |
|
|
|
except AttributeError: |
|
|
|
# Workaround for older versions of python-gobject with no wireguard support |
|
|
|
wgs = [] |
|
|
|
eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)] |
|
|
|
blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] |
|
|
|
|
|
|
|
vpn_actions = create_vpn_actions(vpns, active) |
|
|
|
wg_actions = create_wireguard_actions(wgs, active) |
|
|
|
eth_actions = create_eth_actions(eths, active) |
|
|
|
blue_actions = create_blue_actions(blues, active) |
|
|
|
other_actions = create_other_actions(CLIENT) |
|
|
|
wwan_installed = is_modemmanager_installed() |
|
|
|
if wwan_installed: |
|
|
|
gsms = [i for i in CONNS if i.is_type(NM.SETTING_GSM_SETTING_NAME)] |
|
|
|
gsm_actions = create_gsm_actions(gsms, active) |
|
|
|
wwan_actions = create_wwan_actions(CLIENT) |
|
|
|
else: |
|
|
|
gsm_actions = [] |
|
|
|
wwan_actions = [] |
|
|
|
|
|
|
|
list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False) |
|
|
|
saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues] |
|
|
|
if list_saved: |
|
|
|
saved_actions = create_saved_actions(saved_cons) |
|
|
|
else: |
|
|
|
saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])] |
|
|
|
|
|
|
|
actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions, |
|
|
|
gsm_actions, blue_actions, wwan_actions, |
|
|
|
other_actions, saved_actions) |
|
|
|
sel = get_selection(actions) |
|
|
|
sel() |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
run() |
|
|
|
|
|
|
|
# vim: set et ts=4 sw=4 : |