aboutsummaryrefslogtreecommitdiff
path: root/nixpkgs/nixos/lib/test-driver
diff options
context:
space:
mode:
authorKatharina Fey <kookie@spacekookie.de>2020-01-12 01:00:12 +0000
committerKatharina Fey <kookie@spacekookie.de>2020-01-12 01:00:12 +0000
commiteeaf5d25d5f6ae7ae1f5bf8a3dee4559693f8147 (patch)
treeafc41ca8dde96b41089ca324533084aef570322f /nixpkgs/nixos/lib/test-driver
parent63c4c4dda49dc69e5812faa7ef8406180998f3ae (diff)
parente4134747f5666bcab8680aff67fa3b63384f9a0f (diff)
Merge commit 'e4134747f5666bcab8680aff67fa3b63384f9a0f'
Diffstat (limited to 'nixpkgs/nixos/lib/test-driver')
-rw-r--r--nixpkgs/nixos/lib/test-driver/test-driver.py143
1 files changed, 107 insertions, 36 deletions
diff --git a/nixpkgs/nixos/lib/test-driver/test-driver.py b/nixpkgs/nixos/lib/test-driver/test-driver.py
index e45521424de..7e575189209 100644
--- a/nixpkgs/nixos/lib/test-driver/test-driver.py
+++ b/nixpkgs/nixos/lib/test-driver/test-driver.py
@@ -16,6 +16,8 @@ import tempfile
import time
import unicodedata
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List
+import shlex
+import pathlib
CHAR_TO_KEY = {
"A": "shift-a",
@@ -91,6 +93,10 @@ def eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
+def make_command(args: list) -> str:
+ return " ".join(map(shlex.quote, (map(str, args))))
+
+
def create_vlan(vlan_nr: str) -> Tuple[str, str, "subprocess.Popen[bytes]", Any]:
global log
log.log("starting VDE switch for network {}".format(vlan_nr))
@@ -215,7 +221,7 @@ class Machine:
return path
self.state_dir = create_dir("vm-state-{}".format(self.name))
- self.shared_dir = create_dir("xchg-shared")
+ self.shared_dir = create_dir("{}/xchg".format(self.state_dir))
self.booted = False
self.connected = False
@@ -306,8 +312,13 @@ class Machine:
self.monitor.send(message)
return self.wait_for_monitor_prompt()
- def wait_for_unit(self, unit: str, user: Optional[str] = None) -> bool:
- while True:
+ def wait_for_unit(self, unit: str, user: Optional[str] = None) -> None:
+ """Wait for a systemd unit to get into "active" state.
+ Throws exceptions on "failed" and "inactive" states as well as
+ after timing out.
+ """
+
+ def check_active(_: Any) -> bool:
info = self.get_unit_info(unit, user)
state = info["ActiveState"]
if state == "failed":
@@ -323,8 +334,10 @@ class Machine:
'unit "{}" is inactive and there ' "are no pending jobs"
).format(unit)
)
- if state == "active":
- return True
+
+ return state == "active"
+
+ retry(check_active)
def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
@@ -415,18 +428,34 @@ class Machine:
)
def wait_until_succeeds(self, command: str) -> str:
+ """Wait until a command returns success and return its output.
+ Throws an exception on timeout.
+ """
+ output = ""
+
+ def check_success(_: Any) -> bool:
+ nonlocal output
+ status, output = self.execute(command)
+ return status == 0
+
with self.nested("waiting for success: {}".format(command)):
- while True:
- status, output = self.execute(command)
- if status == 0:
- return output
+ retry(check_success)
+ return output
def wait_until_fails(self, command: str) -> str:
+ """Wait until a command returns failure.
+ Throws an exception on timeout.
+ """
+ output = ""
+
+ def check_failure(_: Any) -> bool:
+ nonlocal output
+ status, output = self.execute(command)
+ return status != 0
+
with self.nested("waiting for failure: {}".format(command)):
- while True:
- status, output = self.execute(command)
- if status != 0:
- return output
+ retry(check_failure)
+ return output
def wait_for_shutdown(self) -> None:
if not self.booted:
@@ -447,25 +476,38 @@ class Machine:
)
return output
- def wait_until_tty_matches(self, tty: str, regexp: str) -> bool:
+ def wait_until_tty_matches(self, tty: str, regexp: str) -> None:
+ """Wait until the visible output on the chosen TTY matches regular
+ expression. Throws an exception on timeout.
+ """
matcher = re.compile(regexp)
+
+ def tty_matches(last: bool) -> bool:
+ text = self.get_tty_text(tty)
+ if last:
+ self.log(
+ f"Last chance to match /{regexp}/ on TTY{tty}, "
+ f"which currently contains: {text}"
+ )
+ return len(matcher.findall(text)) > 0
+
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
- while True:
- text = self.get_tty_text(tty)
- if len(matcher.findall(text)) > 0:
- return True
+ retry(tty_matches)
def send_chars(self, chars: List[str]) -> None:
with self.nested("sending keys ‘{}‘".format(chars)):
for char in chars:
self.send_key(char)
- def wait_for_file(self, filename: str) -> bool:
+ def wait_for_file(self, filename: str) -> None:
+ """Waits until the file exists in machine's file system."""
+
+ def check_file(_: Any) -> bool:
+ status, _ = self.execute("test -e {}".format(filename))
+ return status == 0
+
with self.nested("waiting for file ‘{}‘".format(filename)):
- while True:
- status, _ = self.execute("test -e {}".format(filename))
- if status == 0:
- return True
+ retry(check_file)
def wait_for_open_port(self, port: int) -> None:
def port_is_open(_: Any) -> bool:
@@ -488,8 +530,8 @@ class Machine:
def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
return self.systemctl("stop {}".format(jobname), user)
- def wait_for_job(self, jobname: str) -> bool:
- return self.wait_for_unit(jobname)
+ def wait_for_job(self, jobname: str) -> None:
+ self.wait_for_unit(jobname)
def connect(self) -> None:
if self.connected:
@@ -524,6 +566,33 @@ class Machine:
if ret.returncode != 0:
raise Exception("Cannot convert screenshot")
+ def copy_from_vm(self, source: str, target_dir: str = "") -> None:
+ """Copy a file from the VM (specified by an in-VM source path) to a path
+ relative to `$out`. The file is copied via the `shared_dir` shared among
+ all the VMs (using a temporary directory).
+ """
+ # Compute the source, target, and intermediate shared file names
+ out_dir = pathlib.Path(os.environ.get("out", os.getcwd()))
+ vm_src = pathlib.Path(source)
+ with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
+ shared_temp = pathlib.Path(shared_td)
+ vm_shared_temp = pathlib.Path("/tmp/xchg") / shared_temp.name
+ vm_intermediate = vm_shared_temp / vm_src.name
+ intermediate = shared_temp / vm_src.name
+ # Copy the file to the shared directory inside VM
+ self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
+ self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
+ self.succeed("sync")
+ abs_target = out_dir / target_dir / vm_src.name
+ abs_target.parent.mkdir(exist_ok=True, parents=True)
+ # Copy the file from the shared directory outside VM
+ if intermediate.is_dir():
+ shutil.copytree(intermediate, abs_target)
+ else:
+ shutil.copy(intermediate, abs_target)
+ # Make sure the cleanup is synced into VM
+ self.succeed("sync")
+
def dump_tty_contents(self, tty: str) -> None:
"""Debugging: Dump the contents of the TTY<n>
"""
@@ -667,18 +736,20 @@ class Machine:
"""Wait until it is possible to connect to the X server. Note that
testing the existence of /tmp/.X11-unix/X0 is insufficient.
"""
+
+ def check_x(_: Any) -> bool:
+ cmd = (
+ "journalctl -b SYSLOG_IDENTIFIER=systemd | "
+ + 'grep "Reached target Current graphical"'
+ )
+ status, _ = self.execute(cmd)
+ if status != 0:
+ return False
+ status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
+ return status == 0
+
with self.nested("waiting for the X11 server"):
- while True:
- cmd = (
- "journalctl -b SYSLOG_IDENTIFIER=systemd | "
- + 'grep "Reached target Current graphical"'
- )
- status, _ = self.execute(cmd)
- if status != 0:
- continue
- status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
- if status == 0:
- return
+ retry(check_x)
def get_window_names(self) -> List[str]:
return self.succeed(