Another copy of my dotfiles. Because I don't completely trust GitHub.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

874 lines
30 KiB

  1. #!/usr/bin/env python3
  2. # encoding:utf8
  3. """NetworkManager command line dmenu script.
  4. To add new connections or enable/disable networking requires policykit
  5. permissions setup per:
  6. https://wiki.archlinux.org/index.php/NetworkManager#Set_up_PolicyKit_permissions
  7. OR running the script as root
  8. Add dmenu formatting options and default terminal if desired to
  9. ~/.config/networkmanager-dmenu/config.ini
  10. """
  11. import pathlib
  12. import struct
  13. import configparser
  14. import itertools
  15. import locale
  16. import os
  17. from os.path import expanduser
  18. import shlex
  19. import sys
  20. import uuid
  21. from subprocess import Popen, PIPE
  22. import gi
  23. gi.require_version('NM', '1.0')
  24. from gi.repository import GLib, NM # pylint: disable=wrong-import-position
  25. ENV = os.environ.copy()
  26. ENV['LC_ALL'] = 'C'
  27. ENC = locale.getpreferredencoding()
  28. CLIENT = NM.Client.new(None)
  29. LOOP = GLib.MainLoop()
  30. CONNS = CLIENT.get_connections()
  31. CONF = configparser.ConfigParser()
  32. CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini"))
  33. def dmenu_cmd(num_lines, prompt="Networks", active_lines=None): # pylint: disable=too-many-branches
  34. """Parse config.ini if it exists and add options to the dmenu command
  35. Args: args - num_lines: number of lines to display
  36. prompt: prompt to show
  37. Returns: command invocation (as a list of strings) for
  38. dmenu -l <num_lines> -p <prompt> -i ...
  39. """
  40. dmenu_command = "dmenu"
  41. if not CONF.sections():
  42. res = [dmenu_command, "-z", "1900", "-x", "10", "-y", "10", "-i", "-l", str(num_lines), "-p", str(prompt)]
  43. res.extend(sys.argv[1:])
  44. return res
  45. if CONF.has_section('dmenu'):
  46. args = CONF.items('dmenu')
  47. args_dict = dict(args)
  48. dmenu_args = []
  49. if "dmenu_command" in args_dict:
  50. command = shlex.split(args_dict["dmenu_command"])
  51. dmenu_command = command[0]
  52. dmenu_args = command[1:]
  53. del args_dict["dmenu_command"]
  54. if "p" in args_dict and prompt == "Networks":
  55. prompt = args_dict["p"]
  56. del args_dict["p"]
  57. elif "p" in args_dict:
  58. del args_dict["p"]
  59. if "rofi" in dmenu_command:
  60. lines = "-i -dmenu -lines"
  61. # rofi doesn't support 0 length line, it requires at least -lines=1
  62. # see https://github.com/DaveDavenport/rofi/issues/252
  63. num_lines = num_lines or 1
  64. else:
  65. lines = "-i -l"
  66. if "l" in args_dict:
  67. # rofi doesn't support 0 length line, it requires at least -lines=1
  68. # see https://github.com/DaveDavenport/rofi/issues/252
  69. if "rofi" in dmenu_command:
  70. args_dict['l'] = min(num_lines, int(args_dict['l'])) or 1
  71. lines = "{} {}".format(lines, args_dict['l'])
  72. del args_dict['l']
  73. else:
  74. lines = "{} {}".format(lines, num_lines)
  75. if "pinentry" in args_dict:
  76. del args_dict["pinentry"]
  77. if "compact" in args_dict:
  78. del args_dict["compact"]
  79. if "wifi_chars" in args_dict:
  80. del args_dict["wifi_chars"]
  81. rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
  82. if CONF.has_option('dmenu', 'rofi_highlight'):
  83. del args_dict["rofi_highlight"]
  84. if rofi_highlight is True and "rofi" in dmenu_command:
  85. if active_lines:
  86. dmenu_args.extend(["-a", ",".join([str(num)
  87. for num in active_lines])])
  88. if prompt == "Passphrase":
  89. if CONF.has_section('dmenu_passphrase'):
  90. args = CONF.items('dmenu_passphrase')
  91. args_dict.update(args)
  92. rofi_obscure = CONF.getboolean('dmenu_passphrase', 'rofi_obscure', fallback=True)
  93. if CONF.has_option('dmenu_passphrase', 'rofi_obscure'):
  94. del args_dict["rofi_obscure"]
  95. if rofi_obscure is True and "rofi" in dmenu_command:
  96. dmenu_args.extend(["-password"])
  97. dmenu_password = CONF.getboolean('dmenu_passphrase', 'dmenu_password', fallback=False)
  98. if CONF.has_option('dmenu_passphrase', 'dmenu_password'):
  99. del args_dict["dmenu_password"]
  100. if dmenu_password is True:
  101. dmenu_args.extend(["-P"])
  102. extras = (["-" + str(k), str(v)] for (k, v) in args_dict.items())
  103. res = [dmenu_command, "-p", str(prompt)]
  104. res.extend(dmenu_args)
  105. res += list(itertools.chain.from_iterable(extras))
  106. res[1:1] = lines.split()
  107. res = list(filter(None, res)) # Remove empty list elements
  108. res.extend(sys.argv[1:])
  109. return res
  110. def choose_adapter(client):
  111. """If there is more than one wifi adapter installed, ask which one to use
  112. """
  113. devices = client.get_devices()
  114. devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI]
  115. if not devices: # pylint: disable=no-else-return
  116. return None
  117. elif len(devices) == 1:
  118. return devices[0]
  119. device_names = "\n".join([d.get_iface() for d in devices]).encode(ENC)
  120. sel = Popen(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"),
  121. stdin=PIPE,
  122. stdout=PIPE,
  123. env=ENV).communicate(input=device_names)[0].decode(ENC)
  124. if not sel.strip():
  125. sys.exit()
  126. devices = [i for i in devices if i.get_iface() == sel.strip()]
  127. assert len(devices) == 1
  128. return devices[0]
  129. def is_modemmanager_installed():
  130. """Check if ModemManager is installed"""
  131. with open(os.devnull) as devnull:
  132. try:
  133. Popen(["ModemManager"], stdout=devnull, stderr=devnull).communicate()
  134. except OSError:
  135. return False
  136. return True
  137. def bluetooth_get_enabled():
  138. """Check if bluetooth is enabled via rfkill.
  139. Returns None if no bluetooth device was found.
  140. """
  141. # See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill
  142. for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'):
  143. if (path / 'type').read_text().strip() == 'bluetooth':
  144. return (path / 'soft').read_text().strip() == '0'
  145. return None
  146. def create_other_actions(client):
  147. """Return list of other actions that can be taken
  148. """
  149. networking_enabled = client.networking_get_enabled()
  150. networking_action = "Disable" if networking_enabled else "Enable"
  151. wifi_enabled = client.wireless_get_enabled()
  152. wifi_action = "Disable" if wifi_enabled else "Enable"
  153. bluetooth_enabled = bluetooth_get_enabled()
  154. bluetooth_action = "Disable" if bluetooth_enabled else "Enable"
  155. actions = [Action("{} Wifi".format(wifi_action), toggle_wifi,
  156. not wifi_enabled),
  157. Action("{} Networking".format(networking_action),
  158. toggle_networking, not networking_enabled)]
  159. if bluetooth_enabled is not None:
  160. actions.append(Action("{} Bluetooth".format(bluetooth_action),
  161. toggle_bluetooth, not bluetooth_enabled))
  162. actions += [Action("Launch Connection Manager", launch_connection_editor),
  163. Action("Delete a Connection", delete_connection)]
  164. if wifi_enabled:
  165. actions.append(Action("Rescan Wifi Networks", rescan_wifi))
  166. return actions
  167. def rescan_wifi():
  168. """
  169. Rescan Wifi Access Points
  170. """
  171. for dev in CLIENT.get_devices():
  172. if gi.repository.NM.DeviceWifi == type(dev):
  173. try:
  174. dev.request_scan_async(None, rescan_cb, None)
  175. LOOP.run()
  176. except gi.repository.GLib.Error as err:
  177. # Too frequent rescan error
  178. notify("Wifi rescan failed", urgency="critical")
  179. if not err.code == 6: # pylint: disable=no-member
  180. raise err
  181. def rescan_cb(dev, res, data):
  182. """Callback for rescan_wifi. Just for notifications
  183. """
  184. if dev.request_scan_finish(res) is True:
  185. notify("Wifi scan complete")
  186. else:
  187. notify("Wifi scan failed", urgency="critical")
  188. LOOP.quit()
  189. def ssid_to_utf8(nm_ap):
  190. """ Convert binary ssid to utf-8 """
  191. ssid = nm_ap.get_ssid()
  192. if not ssid:
  193. return ""
  194. ret = NM.utils_ssid_to_utf8(ssid.get_data())
  195. return ret
  196. def prompt_saved(saved_cons):
  197. """Prompt for a saved connection."""
  198. actions = create_saved_actions(saved_cons)
  199. sel = get_selection(actions)
  200. sel()
  201. def ap_security(nm_ap):
  202. """Parse the security flags to return a string with 'WPA2', etc. """
  203. flags = nm_ap.get_flags()
  204. wpa_flags = nm_ap.get_wpa_flags()
  205. rsn_flags = nm_ap.get_rsn_flags()
  206. sec_str = ""
  207. if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and
  208. (wpa_flags == 0) and (rsn_flags == 0)):
  209. sec_str += " WEP"
  210. if wpa_flags != 0:
  211. sec_str += " WPA1"
  212. if rsn_flags != 0:
  213. sec_str += " WPA2"
  214. if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or
  215. (rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)):
  216. sec_str += " 802.1X"
  217. # If there is no security use "--"
  218. if sec_str == "":
  219. sec_str = "--"
  220. return sec_str.lstrip()
  221. class Action(): # pylint: disable=too-few-public-methods
  222. """Helper class to execute functions from a string variable"""
  223. def __init__(self,
  224. name,
  225. func,
  226. args=None,
  227. active=False):
  228. self.name = name
  229. self.func = func
  230. self.is_active = active
  231. if args is None:
  232. self.args = None
  233. elif isinstance(args, list):
  234. self.args = args
  235. else:
  236. self.args = [args]
  237. def __str__(self):
  238. return self.name
  239. def __call__(self):
  240. if self.args is None:
  241. self.func()
  242. else:
  243. self.func(*self.args)
  244. def process_ap(nm_ap, is_active, adapter):
  245. """Activate/Deactivate a connection and get password if required"""
  246. if is_active:
  247. CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap)
  248. else:
  249. conns_cur = [i for i in CONNS if
  250. i.get_setting_wireless() is not None and
  251. i.get_setting_wireless().get_mac_address() ==
  252. adapter.get_permanent_hw_address()]
  253. con = nm_ap.filter_connections(conns_cur)
  254. if len(con) > 1:
  255. raise ValueError("There are multiple connections possible")
  256. if len(con) == 1:
  257. CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(),
  258. None, activate_cb, nm_ap)
  259. else:
  260. if ap_security(nm_ap) != "--":
  261. password = get_passphrase()
  262. else:
  263. password = ""
  264. set_new_connection(nm_ap, password, adapter)
  265. LOOP.run()
  266. def activate_cb(dev, res, data):
  267. """Notification if activate connection completed successfully
  268. """
  269. try:
  270. conn = dev.activate_connection_finish(res)
  271. except GLib.Error:
  272. conn = None
  273. if conn is not None:
  274. notify("Activated {}".format(conn.get_id()))
  275. else:
  276. notify("Problem activating {}".format(data.get_id()),
  277. urgency="critical")
  278. LOOP.quit()
  279. def deactivate_cb(dev, res, data):
  280. """Notification if deactivate connection completed successfully
  281. """
  282. if dev.deactivate_connection_finish(res) is True:
  283. notify("Deactivated {}".format(data.get_id()))
  284. else:
  285. notify("Problem deactivating {}".format(data.get_id()),
  286. urgency="critical")
  287. LOOP.quit()
  288. def process_vpngsm(con, activate):
  289. """Activate/deactive VPN or GSM connections"""
  290. if activate:
  291. CLIENT.activate_connection_async(con, None, None,
  292. None, activate_cb, con)
  293. else:
  294. CLIENT.deactivate_connection_async(con, None, deactivate_cb, con)
  295. LOOP.run()
  296. def create_ap_actions(aps, active_ap, active_connection, adapter): # pylint: disable=too-many-locals
  297. """For each AP in a list, create the string and its attached function
  298. (activate/deactivate)
  299. """
  300. active_ap_bssid = active_ap.get_bssid() if active_ap is not None else ""
  301. names = [ssid_to_utf8(ap) for ap in aps]
  302. max_len_name = max([len(name) for name in names]) if names else 0
  303. secs = [ap_security(ap) for ap in aps]
  304. max_len_sec = max([len(sec) for sec in secs]) if secs else 0
  305. ap_actions = []
  306. for nm_ap, name, sec in zip(aps, names, secs):
  307. bars = NM.utils_wifi_strength_bars(nm_ap.get_strength())
  308. wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False)
  309. if wifi_chars:
  310. bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*'])
  311. is_active = nm_ap.get_bssid() == active_ap_bssid
  312. compact = CONF.getboolean("dmenu", "compact", fallback=False)
  313. if compact:
  314. action_name = u"{} {} {}".format(name, sec, bars)
  315. else:
  316. action_name = u"{:<{}s} {:<{}s} {}".format(name, max_len_name, sec,
  317. max_len_sec, bars)
  318. if is_active:
  319. ap_actions.append(Action(action_name, process_ap,
  320. [active_connection, True, adapter],
  321. active=True))
  322. else:
  323. ap_actions.append(Action(action_name, process_ap,
  324. [nm_ap, False, adapter]))
  325. return ap_actions
  326. def create_vpn_actions(vpns, active):
  327. """Create the list of strings to display with associated function
  328. (activate/deactivate) for VPN connections.
  329. """
  330. active_vpns = [i for i in active if i.get_vpn()]
  331. return _create_vpngsm_actions(vpns, active_vpns, "VPN")
  332. def create_wireguard_actions(wgs, active):
  333. """Create the list of strings to display with associated function
  334. (activate/deactivate) for Wireguard connections.
  335. """
  336. active_wgs = [i for i in active if i.get_connection_type() == "wireguard"]
  337. return _create_vpngsm_actions(wgs, active_wgs, "Wireguard")
  338. def create_eth_actions(eths, active):
  339. """Create the list of strings to display with associated function
  340. (activate/deactivate) for Ethernet connections.
  341. """
  342. active_eths = [i for i in active if 'ethernet' in i.get_connection_type()]
  343. return _create_vpngsm_actions(eths, active_eths, "Eth")
  344. def create_gsm_actions(gsms, active):
  345. """Create the list of strings to display with associated function
  346. (activate/deactivate) GSM connections."""
  347. active_gsms = [i for i in active if
  348. i.get_connection() is not None and
  349. i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)]
  350. return _create_vpngsm_actions(gsms, active_gsms, "GSM")
  351. def create_blue_actions(blues, active):
  352. """Create the list of strings to display with associated function
  353. (activate/deactivate) Bluetooth connections."""
  354. active_blues = [i for i in active if
  355. i.get_connection() is not None and
  356. i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
  357. return _create_vpngsm_actions(blues, active_blues, "Bluetooth")
  358. def create_saved_actions(saved):
  359. """Create the list of strings to display with associated function
  360. (activate/deactivate) for VPN connections.
  361. """
  362. return _create_vpngsm_actions(saved, [], "SAVED")
  363. def _create_vpngsm_actions(cons, active_cons, label):
  364. active_con_ids = [a.get_id() for a in active_cons]
  365. actions = []
  366. for con in cons:
  367. is_active = con.get_id() in active_con_ids
  368. action_name = u"{}:{}".format(con.get_id(), label)
  369. if is_active:
  370. active_connection = [a for a in active_cons
  371. if a.get_id() == con.get_id()]
  372. if len(active_connection) != 1:
  373. raise ValueError(u"Multiple active connections match"
  374. " the connection: {}".format(con.get_id()))
  375. active_connection = active_connection[0]
  376. actions.append(Action(action_name, process_vpngsm,
  377. [active_connection, False], active=True))
  378. else:
  379. actions.append(Action(action_name, process_vpngsm,
  380. [con, True]))
  381. return actions
  382. def create_wwan_actions(client):
  383. """Create WWWAN actions
  384. """
  385. wwan_enabled = client.wwan_get_enabled()
  386. wwan_action = "Disable" if wwan_enabled else "Enable"
  387. return [Action("{} WWAN".format(wwan_action), toggle_wwan, not wwan_enabled)]
  388. def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved):
  389. """Combine all given actions into a list of actions.
  390. Args: args - eths: list of Actions
  391. aps: list of Actions
  392. vpns: list of Actions
  393. gsms: list of Actions
  394. blues: list of Actions
  395. wwan: list of Actions
  396. others: list of Actions
  397. """
  398. compact = CONF.getboolean("dmenu", "compact", fallback=False)
  399. empty_action = [Action('', None)] if not compact else []
  400. all_actions = []
  401. all_actions += eths + empty_action if eths else []
  402. all_actions += aps + empty_action if aps else []
  403. all_actions += vpns + empty_action if vpns else []
  404. all_actions += wgs + empty_action if wgs else []
  405. all_actions += gsms + empty_action if (gsms and wwan) else []
  406. all_actions += blues + empty_action if blues else []
  407. all_actions += wwan + empty_action if wwan else []
  408. all_actions += others + empty_action if others else []
  409. all_actions += saved + empty_action if saved else []
  410. return all_actions
  411. def get_selection(all_actions):
  412. """Spawn dmenu for selection and execute the associated action."""
  413. rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
  414. inp = []
  415. if rofi_highlight is True:
  416. inp = [str(action) for action in all_actions]
  417. else:
  418. inp = [('== ' if action.is_active else ' ') + str(action)
  419. for action in all_actions]
  420. active_lines = [index for index, action in enumerate(all_actions)
  421. if action.is_active]
  422. inp_bytes = "\n".join(inp).encode(ENC)
  423. command = dmenu_cmd(len(inp), active_lines=active_lines)
  424. sel = Popen(command, stdin=PIPE, stdout=PIPE,
  425. env=ENV).communicate(input=inp_bytes)[0].decode(ENC)
  426. if not sel.rstrip():
  427. sys.exit()
  428. if rofi_highlight is False:
  429. action = [i for i in all_actions
  430. if ((str(i).strip() == str(sel.strip())
  431. and not i.is_active) or
  432. ('== ' + str(i) == str(sel.rstrip('\n'))
  433. and i.is_active))]
  434. else:
  435. action = [i for i in all_actions if str(i).strip() == sel.strip()]
  436. assert len(action) == 1, \
  437. u"Selection was ambiguous: '{}'".format(str(sel.strip()))
  438. return action[0]
  439. def toggle_networking(enable):
  440. """Enable/disable networking
  441. Args: enable - boolean
  442. """
  443. toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable))
  444. try:
  445. CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle,
  446. None, -1, None, None, None)
  447. except AttributeError:
  448. # Workaround for older versions of python-gobject
  449. CLIENT.networking_set_enabled(enable)
  450. notify("Networking {}".format("enabled" if enable is True else "disabled"))
  451. def toggle_wifi(enable):
  452. """Enable/disable Wifi
  453. Args: enable - boolean
  454. """
  455. toggle = GLib.Variant.new_boolean(enable)
  456. try:
  457. CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle,
  458. -1, None, None, None)
  459. except AttributeError:
  460. # Workaround for older versions of python-gobject
  461. CLIENT.wireless_set_enabled(enable)
  462. notify("Wifi {}".format("enabled" if enable is True else "disabled"))
  463. def toggle_wwan(enable):
  464. """Enable/disable WWAN
  465. Args: enable - boolean
  466. """
  467. toggle = GLib.Variant.new_boolean(enable)
  468. try:
  469. CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle,
  470. -1, None, None, None)
  471. except AttributeError:
  472. # Workaround for older versions of python-gobject
  473. CLIENT.wwan_set_enabled(enable)
  474. notify("Wwan {}".format("enabled" if enable is True else "disabled"))
  475. def toggle_bluetooth(enable):
  476. """Enable/disable Bluetooth
  477. Args: enable - boolean
  478. References:
  479. https://github.com/blueman-project/blueman/blob/master/blueman/plugins/mechanism/RfKill.py
  480. https://www.kernel.org/doc/html/latest/driver-api/rfkill.html
  481. https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9
  482. """
  483. type_bluetooth = 2
  484. op_change_all = 3
  485. idx = 0
  486. soft_state = 0 if enable else 1
  487. hard_state = 0
  488. data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all,
  489. soft_state, hard_state)
  490. try:
  491. with open('/dev/rfkill', 'r+b', buffering=0) as rff:
  492. rff.write(data)
  493. except PermissionError:
  494. notify("Lacking permission to write to /dev/rfkill.",
  495. "Maybe you need to add your user to the 'rfkill' group?",
  496. urgency="critical")
  497. else:
  498. notify("Bluetooth {}".format("enabled" if enable else "disabled"))
  499. def launch_connection_editor():
  500. """Launch nmtui or the gui nm-connection-editor
  501. """
  502. terminal = CONF.get("editor", "terminal", fallback="xterm")
  503. gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True)
  504. if gui_if_available is True:
  505. try:
  506. Popen(["gnome-control-center", "network"]).communicate()
  507. except OSError:
  508. try:
  509. Popen(["nm-connection-editor"]).communicate()
  510. except OSError:
  511. Popen([terminal, "-e", "nmtui"]).communicate()
  512. else:
  513. Popen([terminal, "-e", "nmtui"]).communicate()
  514. def get_passphrase():
  515. """Get a password
  516. Returns: string
  517. """
  518. pinentry = CONF.get("dmenu", "pinentry", fallback=None)
  519. if pinentry:
  520. pin = ""
  521. out = Popen(pinentry,
  522. stdout=PIPE,
  523. stdin=PIPE).communicate(input=b'setdesc Get network password\ngetpin\n')[0]
  524. if out:
  525. res = out.decode(ENC).split("\n")[2]
  526. if res.startswith("D "):
  527. pin = res.split("D ")[1]
  528. return pin
  529. return Popen(dmenu_cmd(0, "Passphrase"),
  530. stdin=PIPE, stdout=PIPE).communicate()[0].decode(ENC)
  531. def delete_connection():
  532. """Display list of NM connections and delete the selected one
  533. """
  534. conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS]
  535. conn_names = "\n".join([str(i) for i in conn_acts]).encode(ENC)
  536. sel = Popen(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"),
  537. stdin=PIPE,
  538. stdout=PIPE,
  539. env=ENV).communicate(input=conn_names)[0].decode(ENC)
  540. if not sel.strip():
  541. sys.exit()
  542. action = [i for i in conn_acts if str(i) == sel.rstrip("\n")]
  543. assert len(action) == 1, u"Selection was ambiguous: {}".format(str(sel))
  544. action[0]()
  545. LOOP.run()
  546. def delete_cb(dev, res, data):
  547. """Notification if delete completed successfully
  548. """
  549. if dev.delete_finish(res) is True:
  550. notify("Deleted {}".format(dev.get_id()))
  551. else:
  552. notify("Problem deleting {}".format(dev.get_id()), urgency="critical")
  553. LOOP.quit()
  554. def set_new_connection(nm_ap, nm_pw, adapter):
  555. """Setup a new NetworkManager connection
  556. Args: ap - NM.AccessPoint
  557. pw - string
  558. """
  559. nm_pw = str(nm_pw).strip()
  560. profile = create_wifi_profile(nm_ap, nm_pw, adapter)
  561. CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(),
  562. None, verify_conn, profile)
  563. LOOP.run()
  564. def create_wifi_profile(nm_ap, password, adapter):
  565. # pylint: disable=C0301
  566. # From https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/gi/add_connection.py
  567. # and https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/dbus/add-wifi-psk-connection.py
  568. # pylint: enable=C0301
  569. """Create the NM profile given the AP and passphrase"""
  570. ap_sec = ap_security(nm_ap)
  571. profile = NM.SimpleConnection.new()
  572. s_con = NM.SettingConnection.new()
  573. s_con.set_property(NM.SETTING_CONNECTION_ID, ssid_to_utf8(nm_ap))
  574. s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4()))
  575. s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless")
  576. profile.add_setting(s_con)
  577. s_wifi = NM.SettingWireless.new()
  578. s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid())
  579. s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure')
  580. s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address())
  581. profile.add_setting(s_wifi)
  582. s_ip4 = NM.SettingIP4Config.new()
  583. s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
  584. profile.add_setting(s_ip4)
  585. s_ip6 = NM.SettingIP6Config.new()
  586. s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
  587. profile.add_setting(s_ip6)
  588. if ap_sec != "--":
  589. s_wifi_sec = NM.SettingWirelessSecurity.new()
  590. if "WPA" in ap_sec:
  591. s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
  592. "wpa-psk")
  593. s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG,
  594. "open")
  595. s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password)
  596. elif "WEP" in ap_sec:
  597. s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
  598. "None")
  599. s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE,
  600. NM.WepKeyType.PASSPHRASE)
  601. s_wifi_sec.set_wep_key(0, password)
  602. profile.add_setting(s_wifi_sec)
  603. return profile
  604. def verify_conn(client, result, data):
  605. """Callback function for add_and_activate_connection_async
  606. Check if connection completes successfully. Delete the connection if there
  607. is an error.
  608. """
  609. try:
  610. act_conn = client.add_and_activate_connection_finish(result)
  611. conn = act_conn.get_connection()
  612. if not all([conn.verify(),
  613. conn.verify_secrets(),
  614. data.verify(),
  615. data.verify_secrets()]):
  616. raise GLib.Error
  617. notify("Added {}".format(conn.get_id()))
  618. except GLib.Error: # pylint: disable=catching-non-exception
  619. try:
  620. notify("Connection to {} failed".format(conn.get_id()),
  621. urgency="critical")
  622. conn.delete_async(None, None, None)
  623. except UnboundLocalError:
  624. pass
  625. finally:
  626. LOOP.quit()
  627. def create_ap_list(adapter, active_connections):
  628. """Generate list of access points. Remove duplicate APs , keeping strongest
  629. ones and the active AP
  630. Args: adapter
  631. active_connections - list of all active connections
  632. Returns: aps - list of access points
  633. active_ap - active AP
  634. active_ap_con - active Connection
  635. adapter
  636. """
  637. aps = []
  638. ap_names = []
  639. active_ap = adapter.get_active_access_point()
  640. aps_all = sorted(adapter.get_access_points(),
  641. key=lambda a: a.get_strength(), reverse=True)
  642. conns_cur = [i for i in CONNS if
  643. i.get_setting_wireless() is not None and
  644. i.get_setting_wireless().get_mac_address() ==
  645. adapter.get_permanent_hw_address()]
  646. try:
  647. ap_conns = active_ap.filter_connections(conns_cur)
  648. active_ap_name = ssid_to_utf8(active_ap)
  649. active_ap_con = [active_conn for active_conn in active_connections
  650. if active_conn.get_connection() in ap_conns]
  651. except AttributeError:
  652. active_ap_name = None
  653. active_ap_con = []
  654. if len(active_ap_con) > 1:
  655. raise ValueError("Multiple connection profiles match"
  656. " the wireless AP")
  657. active_ap_con = active_ap_con[0] if active_ap_con else None
  658. for nm_ap in aps_all:
  659. ap_name = ssid_to_utf8(nm_ap)
  660. if nm_ap != active_ap and ap_name == active_ap_name:
  661. # Skip adding AP if it's not active but same name as active AP
  662. continue
  663. if ap_name not in ap_names:
  664. ap_names.append(ap_name)
  665. aps.append(nm_ap)
  666. return aps, active_ap, active_ap_con, adapter
  667. def notify(message, details=None, urgency="low"):
  668. """Use notify-send if available for notifications
  669. """
  670. args = ["-u", urgency, message]
  671. if details is not None:
  672. args.append(details)
  673. try:
  674. Popen(["notify-send"] + args,
  675. stdout=PIPE, stderr=PIPE).communicate()
  676. except FileNotFoundError:
  677. pass
  678. def run():
  679. """Main script entrypoint"""
  680. active = CLIENT.get_active_connections()
  681. adapter = choose_adapter(CLIENT)
  682. if adapter:
  683. ap_actions = create_ap_actions(*create_ap_list(adapter, active))
  684. else:
  685. ap_actions = []
  686. vpns = [i for i in CONNS if i.is_type(NM.SETTING_VPN_SETTING_NAME)]
  687. try:
  688. wgs = [i for i in CONNS if i.is_type(NM.SETTING_WIREGUARD_SETTING_NAME)]
  689. except AttributeError:
  690. # Workaround for older versions of python-gobject with no wireguard support
  691. wgs = []
  692. eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)]
  693. blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
  694. vpn_actions = create_vpn_actions(vpns, active)
  695. wg_actions = create_wireguard_actions(wgs, active)
  696. eth_actions = create_eth_actions(eths, active)
  697. blue_actions = create_blue_actions(blues, active)
  698. other_actions = create_other_actions(CLIENT)
  699. wwan_installed = is_modemmanager_installed()
  700. if wwan_installed:
  701. gsms = [i for i in CONNS if i.is_type(NM.SETTING_GSM_SETTING_NAME)]
  702. gsm_actions = create_gsm_actions(gsms, active)
  703. wwan_actions = create_wwan_actions(CLIENT)
  704. else:
  705. gsm_actions = []
  706. wwan_actions = []
  707. list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False)
  708. saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues]
  709. if list_saved:
  710. saved_actions = create_saved_actions(saved_cons)
  711. else:
  712. saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])]
  713. actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions,
  714. gsm_actions, blue_actions, wwan_actions,
  715. other_actions, saved_actions)
  716. sel = get_selection(actions)
  717. sel()
  718. if __name__ == '__main__':
  719. run()
  720. # vim: set et ts=4 sw=4 :