diff options
author | Timothy Pearson <tpearson@raptorengineering.com> | 2019-05-11 15:12:49 -0500 |
---|---|---|
committer | Timothy Pearson <tpearson@raptorengineering.com> | 2019-05-11 15:12:49 -0500 |
commit | 9e80202352dd49bdd9e67b8b906d86f058431505 (patch) | |
tree | 5673c17aad6e3833da8c4ff21b5a11f666ec9fbe /src/scripts/qtest.py | |
download | hqemu-master.zip hqemu-master.tar.gz |
Diffstat (limited to 'src/scripts/qtest.py')
-rw-r--r-- | src/scripts/qtest.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/src/scripts/qtest.py b/src/scripts/qtest.py new file mode 100644 index 0000000..a971445 --- /dev/null +++ b/src/scripts/qtest.py @@ -0,0 +1,71 @@ +# QEMU qtest library +# +# Copyright (C) 2015 Red Hat Inc. +# +# Authors: +# Fam Zheng <famz@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. +# +# Based on qmp.py. +# + +import errno +import socket + +class QEMUQtestProtocol(object): + def __init__(self, address, server=False): + """ + Create a QEMUQtestProtocol object. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode, listens on the socket (bool) + @raise socket.error on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self._address = address + self._sock = self._get_sock() + if server: + self._sock.bind(self._address) + self._sock.listen(1) + + def _get_sock(self): + if isinstance(self._address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def connect(self): + """ + Connect to the qtest socket. + + @raise socket.error on socket connection errors + """ + self._sock.connect(self._address) + + def accept(self): + """ + Await connection from QEMU. + + @raise socket.error on socket connection errors + """ + self._sock, _ = self._sock.accept() + + def cmd(self, qtest_cmd): + """ + Send a qtest command on the wire. + + @param qtest_cmd: qtest command text to be sent + """ + self._sock.sendall(qtest_cmd + "\n") + + def close(self): + self._sock.close() + + def settimeout(self, timeout): + self._sock.settimeout(timeout) |