Revision e58f87a9 lib/cmdlib.py
b/lib/cmdlib.py | ||
---|---|---|
36 | 36 |
import logging |
37 | 37 |
import copy |
38 | 38 |
import OpenSSL |
39 |
import socket |
|
40 |
import tempfile |
|
41 |
import shutil |
|
39 | 42 |
|
40 | 43 |
from ganeti import ssh |
41 | 44 |
from ganeti import utils |
... | ... | |
9848 | 9851 |
self._TestDelay() |
9849 | 9852 |
|
9850 | 9853 |
|
9854 |
class LUTestJobqueue(NoHooksLU): |
|
9855 |
"""Utility LU to test some aspects of the job queue. |
|
9856 |
|
|
9857 |
""" |
|
9858 |
_OP_PARAMS = [ |
|
9859 |
("notify_waitlock", False, _TBool), |
|
9860 |
("notify_exec", False, _TBool), |
|
9861 |
("log_messages", _EmptyList, _TListOf(_TString)), |
|
9862 |
("fail", False, _TBool), |
|
9863 |
] |
|
9864 |
REQ_BGL = False |
|
9865 |
|
|
9866 |
# Must be lower than default timeout for WaitForJobChange to see whether it |
|
9867 |
# notices changed jobs |
|
9868 |
_CLIENT_CONNECT_TIMEOUT = 20.0 |
|
9869 |
_CLIENT_CONFIRM_TIMEOUT = 60.0 |
|
9870 |
|
|
9871 |
@classmethod |
|
9872 |
def _NotifyUsingSocket(cls, cb, errcls): |
|
9873 |
"""Opens a Unix socket and waits for another program to connect. |
|
9874 |
|
|
9875 |
@type cb: callable |
|
9876 |
@param cb: Callback to send socket name to client |
|
9877 |
@type errcls: class |
|
9878 |
@param errcls: Exception class to use for errors |
|
9879 |
|
|
9880 |
""" |
|
9881 |
# Using a temporary directory as there's no easy way to create temporary |
|
9882 |
# sockets without writing a custom loop around tempfile.mktemp and |
|
9883 |
# socket.bind |
|
9884 |
tmpdir = tempfile.mkdtemp() |
|
9885 |
try: |
|
9886 |
tmpsock = utils.PathJoin(tmpdir, "sock") |
|
9887 |
|
|
9888 |
logging.debug("Creating temporary socket at %s", tmpsock) |
|
9889 |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
9890 |
try: |
|
9891 |
sock.bind(tmpsock) |
|
9892 |
sock.listen(1) |
|
9893 |
|
|
9894 |
# Send details to client |
|
9895 |
cb(tmpsock) |
|
9896 |
|
|
9897 |
# Wait for client to connect before continuing |
|
9898 |
sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) |
|
9899 |
try: |
|
9900 |
(conn, _) = sock.accept() |
|
9901 |
except socket.error, err: |
|
9902 |
raise errcls("Client didn't connect in time (%s)" % err) |
|
9903 |
finally: |
|
9904 |
sock.close() |
|
9905 |
finally: |
|
9906 |
# Remove as soon as client is connected |
|
9907 |
shutil.rmtree(tmpdir) |
|
9908 |
|
|
9909 |
# Wait for client to close |
|
9910 |
try: |
|
9911 |
try: |
|
9912 |
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) |
|
9913 |
conn.recv(1) |
|
9914 |
except socket.error, err: |
|
9915 |
raise errcls("Client failed to confirm notification (%s)" % err) |
|
9916 |
finally: |
|
9917 |
conn.close() |
|
9918 |
|
|
9919 |
def _SendNotification(self, test, arg, sockname): |
|
9920 |
"""Sends a notification to the client. |
|
9921 |
|
|
9922 |
@type test: string |
|
9923 |
@param test: Test name |
|
9924 |
@param arg: Test argument (depends on test) |
|
9925 |
@type sockname: string |
|
9926 |
@param sockname: Socket path |
|
9927 |
|
|
9928 |
""" |
|
9929 |
self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) |
|
9930 |
|
|
9931 |
def _Notify(self, prereq, test, arg): |
|
9932 |
"""Notifies the client of a test. |
|
9933 |
|
|
9934 |
@type prereq: bool |
|
9935 |
@param prereq: Whether this is a prereq-phase test |
|
9936 |
@type test: string |
|
9937 |
@param test: Test name |
|
9938 |
@param arg: Test argument (depends on test) |
|
9939 |
|
|
9940 |
""" |
|
9941 |
if prereq: |
|
9942 |
errcls = errors.OpPrereqError |
|
9943 |
else: |
|
9944 |
errcls = errors.OpExecError |
|
9945 |
|
|
9946 |
return self._NotifyUsingSocket(compat.partial(self._SendNotification, |
|
9947 |
test, arg), |
|
9948 |
errcls) |
|
9949 |
|
|
9950 |
def CheckArguments(self): |
|
9951 |
self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 |
|
9952 |
self.expandnames_calls = 0 |
|
9953 |
|
|
9954 |
def ExpandNames(self): |
|
9955 |
checkargs_calls = getattr(self, "checkargs_calls", 0) |
|
9956 |
if checkargs_calls < 1: |
|
9957 |
raise errors.ProgrammerError("CheckArguments was not called") |
|
9958 |
|
|
9959 |
self.expandnames_calls += 1 |
|
9960 |
|
|
9961 |
if self.op.notify_waitlock: |
|
9962 |
self._Notify(True, constants.JQT_EXPANDNAMES, None) |
|
9963 |
|
|
9964 |
self.LogInfo("Expanding names") |
|
9965 |
|
|
9966 |
# Get lock on master node (just to get a lock, not for a particular reason) |
|
9967 |
self.needed_locks = { |
|
9968 |
locking.LEVEL_NODE: self.cfg.GetMasterNode(), |
|
9969 |
} |
|
9970 |
|
|
9971 |
def Exec(self, feedback_fn): |
|
9972 |
if self.expandnames_calls < 1: |
|
9973 |
raise errors.ProgrammerError("ExpandNames was not called") |
|
9974 |
|
|
9975 |
if self.op.notify_exec: |
|
9976 |
self._Notify(False, constants.JQT_EXEC, None) |
|
9977 |
|
|
9978 |
self.LogInfo("Executing") |
|
9979 |
|
|
9980 |
if self.op.log_messages: |
|
9981 |
for idx, msg in enumerate(self.op.log_messages): |
|
9982 |
self.LogInfo("Sending log message %s", idx + 1) |
|
9983 |
feedback_fn(constants.JQT_MSGPREFIX + msg) |
|
9984 |
# Report how many test messages have been sent |
|
9985 |
self._Notify(False, constants.JQT_LOGMSG, idx + 1) |
|
9986 |
|
|
9987 |
if self.op.fail: |
|
9988 |
raise errors.OpExecError("Opcode failure was requested") |
|
9989 |
|
|
9990 |
return True |
|
9991 |
|
|
9992 |
|
|
9851 | 9993 |
class IAllocator(object): |
9852 | 9994 |
"""IAllocator framework. |
9853 | 9995 |
|
Also available in: Unified diff