Overview
Comment: | Run script through flake8 |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | trunk |
Files: | files | file ages | folders |
SHA1: |
d7bb8aa950f6763cf24d6689bd011e57 |
User & Date: | vitus on 2024-07-10 16:24:26 |
Other Links: | manifest | tags |
Context
2024-07-10
| ||
16:24 | Run script through flake8 Leaf check-in: d7bb8aa950 user: vitus tags: trunk | |
2023-07-17
| ||
10:43 | Reformatted python code with black and silence some pylint warnings check-in: f8285bffb0 user: vitus tags: trunk | |
Changes
Modified vws from [ec09f36c47] to [f94852cc76].
︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import time import pwd import grp VERSION = "0.8.1" config = ConfigParser(interpolation=None) # pylint: disable=invalid-name def find_vm(name): """Search and return VM directory""" search_path = [ os.path.join(pwd.getpwuid(os.getuid()).pw_dir, "VWs"), config.get("directories", "SharedVMs"), config.get("directories", "AutostartVMs"), | > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import time import pwd import grp VERSION = "0.8.1" config = ConfigParser(interpolation=None) # pylint: disable=invalid-name def find_vm(name): """Search and return VM directory""" search_path = [ os.path.join(pwd.getpwuid(os.getuid()).pw_dir, "VWs"), config.get("directories", "SharedVMs"), config.get("directories", "AutostartVMs"), |
︙ | ︙ | |||
89 90 91 92 93 94 95 | output = send_command(sock, "info spice") url = None for line in output.split("\n"): if url is not None: continue idx = line.find("address:") if idx != -1: | | | > | 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | output = send_command(sock, "info spice") url = None for line in output.split("\n"): if url is not None: continue idx = line.find("address:") if idx != -1: url = line[idx + 9:] if url.startswith("*:"): url = socket.getfqdn() + url[1:] if url is None: if output.rstrip().endswith("(qemu)"): return spiceurl(sock) print("ERROR parsing 'info spice' output:«%s»" % output, file=sys.stderr) return None return "spice://" + url.rstrip("\r") def list_bridges(): """Return list of bridge network interfaces present in the system""" lst = [] |
︙ | ︙ | |||
216 217 218 219 220 221 222 | data = f.read() idx0 = data.find("-net bridge,br=") if idx0 != -1: idx = data.find("=", idx0) idx += 1 idx2 = data.find(" ", idx) bridgename = data[idx:idx2] | | | 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | data = f.read() idx0 = data.find("-net bridge,br=") if idx0 != -1: idx = data.find("=", idx0) idx += 1 idx2 = data.find(" ", idx) bridgename = data[idx:idx2] if bridgename not in list_bridges(): net = config.get("create options", "net") if net == "user": data = data[:idx0] + "-net user" + data[idx2:] else: data = data[:idx] + net + data[idx2:] f.seek(0) f.write(data) |
︙ | ︙ | |||
271 272 273 274 275 276 277 | if arg: return shlex.join(arg) return "" def cmd_start(options): """vws start""" | | | 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | if arg: return shlex.join(arg) return "" def cmd_start(options): """vws start""" if "DISPLAY" not in os.environ: # If cannot start GUI just don't do it. options.gui = False if options.stopped: arg = make_start_cmdline(options) cwd = os.getcwd() os.chdir(options.dir) # Check for snapshot |
︙ | ︙ | |||
338 339 340 341 342 343 344 | def cmd_monitor(options): """vws monitor""" eol = False try: print("(qemu) ", end="") sys.stdout.flush() while True: | | > | | 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 | def cmd_monitor(options): """vws monitor""" eol = False try: print("(qemu) ", end="") sys.stdout.flush() while True: readfd, dummy_w, dummy_x = select.select([sys.stdin, options.sock], [], []) if sys.stdin in readfd: cmd = sys.stdin.readline() # Check for eof if cmd == "": break answer = send_command(options.sock, cmd.rstrip()) idx = answer.index("\n") print(answer[idx + 1:], end="") eol = answer.endswith("\n") sys.stdout.flush() elif options.sock in readfd: print( "UNSOLICITED MESSAGE %" + options.sock.recv(1000).decode("utf-8").rstrip() ) |
︙ | ︙ | |||
387 388 389 390 391 392 393 | def cmd_cdrom(options): """vws cdrom""" if options.id is None: # Search for devices which could be interpreted as CDROM devlist = send_command(options.sock, "info block") idx = devlist.find("info block") if idx != -1: | | | 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 | def cmd_cdrom(options): """vws cdrom""" if options.id is None: # Search for devices which could be interpreted as CDROM devlist = send_command(options.sock, "info block") idx = devlist.find("info block") if idx != -1: devlist = devlist[devlist.find("\n", idx) + 1:] for dev in devlist.split("\r\n\r\n"): if dev.find("\n Removable device: ") == -1: continue if dev.startswith("floppy"): continue dev_id = dev[: dev.find(":")] idx = dev_id.find(" ") |
︙ | ︙ | |||
409 410 411 412 413 414 415 | if options.file == "": print("Please specify either --eject or iso image", file=sys.stderr) return 1 if options.file is None: answer = send_command(options.sock, "eject " + options.id) else: answer = send_command( | | > | 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 | if options.file == "": print("Please specify either --eject or iso image", file=sys.stderr) return 1 if options.file is None: answer = send_command(options.sock, "eject " + options.id) else: answer = send_command( options.sock, "change %s %s" % (options.id, os.path.abspath(options.file)) ) print(answer) return 0 def find_usb(options, devices): """Search for pattern or address given in options in the |
︙ | ︙ | |||
506 507 508 509 510 511 512 | f.update(read_netinfo(os.path.join(dirname, "start"))) else: uri = spiceurl(sock) if uri is None: f.update({"state": "problem ", "uri": "None", "ip": "-"}) f.update(read_netinfo(os.path.join(dirname, "start"))) else: | | | | 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 | f.update(read_netinfo(os.path.join(dirname, "start"))) else: uri = spiceurl(sock) if uri is None: f.update({"state": "problem ", "uri": "None", "ip": "-"}) f.update(read_netinfo(os.path.join(dirname, "start"))) else: f["uri"] = uri[uri.rindex(":") + 1:] f.update(get_netinfo(sock)) sock.shutdown(socket.SHUT_RDWR) sock.close() f["state"] = "running " return f def add_ip_address(listing): """Adds IP addresses from ARP into VM listing""" bridges = set() for vminfo in listing: if "ip" not in vminfo: bridges.add(vminfo["iface"]) arp_data = {} for bridge in bridges: arp_data.update(parse_arp(bridge)) for vminfo in listing: if "mac" in vminfo and "ip" not in vminfo: if vminfo["mac"] in arp_data: vminfo["ip"] = arp_data[vminfo["mac"]] else: vminfo["ip"] = "-" def all_vms(patterns=("*",)): |
︙ | ︙ | |||
835 836 837 838 839 840 841 | if [ -n "$SPICE_PASSWORD" ]; then SPICE_AUTH="password=$SPICE_PASSWORD" else SPICE_AUTH="disable-ticketing,addr=127.0.0.1" fi SPICE_PORT=$(find_free_port 5900) if [ "$1" = '-cdrom' ]; then | | | | | 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 | if [ -n "$SPICE_PASSWORD" ]; then SPICE_AUTH="password=$SPICE_PASSWORD" else SPICE_AUTH="disable-ticketing,addr=127.0.0.1" fi SPICE_PORT=$(find_free_port 5900) if [ "$1" = '-cdrom' ]; then shift CDROM=",file=$1" shift fi #set umask to make machine group-accessable umask 002 {qemubinary} -name $NAME {accel} \\ -m {memory} \\ {drive} \\ {cdrom}$CDROM \\ |
︙ | ︙ | |||
901 902 903 904 905 906 907 | else: machinedir = os.path.join( pwd.getpwuid(os.getuid()).pw_dir, "VWs", parsed_args.machine ) dirmode = 0o775 if parsed_args.net != "user": bridges = list_bridges() | | | | 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 | else: machinedir = os.path.join( pwd.getpwuid(os.getuid()).pw_dir, "VWs", parsed_args.machine ) dirmode = 0o775 if parsed_args.net != "user": bridges = list_bridges() if parsed_args.net not in bridges: raise ValueError( "No such bridge %s. Available ones %s" % (parsed_args.net, ", ".join(bridges)) ) options["net"] = "-net nic,macaddr=%s -net bridge,br=%s" % ( macaddr, parsed_args.net, ) else: options["net"] = "-net nic,macaddr=%s -net user" % (macaddr,) options["qemubinary"] = "qemu-system-" + parsed_args.arch options["vga"] = parsed_args.vga if parsed_args.arch not in ("i386", "x86_64"): print(NOACCEL + "target architecture", file=sys.stderr) options.accel = "" elif not os.access("/dev/kvm", os.W_OK): print(NOACCEL + "unavailability on the host system", file=sys.stderr) options.accel = "" if not parsed_args.usb: |
︙ | ︙ | |||
1013 1014 1015 1016 1017 1018 1019 | parser.add_argument("machine", type=str, help="name of vm to operate on") return parser # # prepare defaults for config # | < < < < | 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 | parser.add_argument("machine", type=str, help="name of vm to operate on") return parser # # prepare defaults for config # def config_defaults(conf): """Set default values for config options""" arch = os.uname()[4] if re.match("i[3-9]86", arch): arch = "i386" elif arch.startswith("arm"): arch = "arm" |
︙ | ︙ | |||
1072 1073 1074 1075 1076 1077 1078 | else: conf.read(["/etc/vws.conf"]) def main(): """Parse an arguments and execute everything""" # pylint: disable=too-many-branches, too-many-statements | | | 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 | else: conf.read(["/etc/vws.conf"]) def main(): """Parse an arguments and execute everything""" # pylint: disable=too-many-branches, too-many-statements global config # pylint: disable=invalid-name, global-variable-not-assigned config_defaults(config) read_config(config) # Parse argument args = ArgumentParser(description="Manage Virtual Workstations") cmds = args.add_subparsers(dest="command", help="sub-command help") p = cmds.add_parser( "list", help="List existing VWs", description="List existing VWs" |
︙ | ︙ | |||
1394 1395 1396 1397 1398 1399 1400 | parsed_args.stopped = False stopped_vm_commands = ["start", "snapshot", "revert", "commit", "snapshots"] if hasattr(parsed_args, "machine"): parsed_args.dir = find_vm(parsed_args.machine) parsed_args.sock = connect_vm(parsed_args.dir) if parsed_args.sock is None: | | | 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 | parsed_args.stopped = False stopped_vm_commands = ["start", "snapshot", "revert", "commit", "snapshots"] if hasattr(parsed_args, "machine"): parsed_args.dir = find_vm(parsed_args.machine) parsed_args.sock = connect_vm(parsed_args.dir) if parsed_args.sock is None: if parsed_args.command not in stopped_vm_commands: print( "Virtual machine %s is not running." % parsed_args.machine, file=sys.stderr, ) sys.exit(1) else: parsed_args.stopped = True |
︙ | ︙ |