Merge branch 'stable-2.8' into stable-2.9
[ganeti-local] / lib / cmdlib / test.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Test logical units."""
23
24 import logging
25 import shutil
26 import socket
27 import tempfile
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import locking
33 from ganeti import utils
34 from ganeti.masterd import iallocator
35 from ganeti.cmdlib.base import NoHooksLU
36 from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \
37   GetWantedInstances
38
39
40 class LUTestDelay(NoHooksLU):
41   """Sleep for a specified amount of time.
42
43   This LU sleeps on the master and/or nodes for a specified amount of
44   time.
45
46   """
47   REQ_BGL = False
48
49   def ExpandNames(self):
50     """Expand names and set required locks.
51
52     This expands the node list, if any.
53
54     """
55     self.needed_locks = {}
56
57     if self.op.on_nodes or self.op.on_master:
58       self.needed_locks[locking.LEVEL_NODE] = []
59
60     if self.op.on_nodes:
61       # _GetWantedNodes can be used here, but is not always appropriate to use
62       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
63       # more information.
64       (self.op.on_node_uuids, self.op.on_nodes) = \
65         GetWantedNodes(self, self.op.on_nodes)
66       self.needed_locks[locking.LEVEL_NODE].extend(self.op.on_node_uuids)
67
68     if self.op.on_master:
69       # The node lock should be acquired for the master as well.
70       self.needed_locks[locking.LEVEL_NODE].append(self.cfg.GetMasterNode())
71
72   def _TestDelay(self):
73     """Do the actual sleep.
74
75     """
76     if self.op.on_master:
77       if not utils.TestDelay(self.op.duration):
78         raise errors.OpExecError("Error during master delay test")
79     if self.op.on_node_uuids:
80       result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration)
81       for node_uuid, node_result in result.items():
82         node_result.Raise("Failure during rpc call to node %s" %
83                           self.cfg.GetNodeName(node_uuid))
84
85   def Exec(self, feedback_fn):
86     """Execute the test delay opcode, with the wanted repetitions.
87
88     """
89     if self.op.repeat == 0:
90       self._TestDelay()
91     else:
92       top_value = self.op.repeat - 1
93       for i in range(self.op.repeat):
94         self.LogInfo("Test delay iteration %d/%d", i, top_value)
95         self._TestDelay()
96
97
98 class LUTestJqueue(NoHooksLU):
99   """Utility LU to test some aspects of the job queue.
100
101   """
102   REQ_BGL = False
103
104   # Must be lower than default timeout for WaitForJobChange to see whether it
105   # notices changed jobs
106   _CLIENT_CONNECT_TIMEOUT = 20.0
107   _CLIENT_CONFIRM_TIMEOUT = 60.0
108
109   @classmethod
110   def _NotifyUsingSocket(cls, cb, errcls):
111     """Opens a Unix socket and waits for another program to connect.
112
113     @type cb: callable
114     @param cb: Callback to send socket name to client
115     @type errcls: class
116     @param errcls: Exception class to use for errors
117
118     """
119     # Using a temporary directory as there's no easy way to create temporary
120     # sockets without writing a custom loop around tempfile.mktemp and
121     # socket.bind
122     tmpdir = tempfile.mkdtemp()
123     try:
124       tmpsock = utils.PathJoin(tmpdir, "sock")
125
126       logging.debug("Creating temporary socket at %s", tmpsock)
127       sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
128       try:
129         sock.bind(tmpsock)
130         sock.listen(1)
131
132         # Send details to client
133         cb(tmpsock)
134
135         # Wait for client to connect before continuing
136         sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
137         try:
138           (conn, _) = sock.accept()
139         except socket.error, err:
140           raise errcls("Client didn't connect in time (%s)" % err)
141       finally:
142         sock.close()
143     finally:
144       # Remove as soon as client is connected
145       shutil.rmtree(tmpdir)
146
147     # Wait for client to close
148     try:
149       try:
150         # pylint: disable=E1101
151         # Instance of '_socketobject' has no ... member
152         conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
153         conn.recv(1)
154       except socket.error, err:
155         raise errcls("Client failed to confirm notification (%s)" % err)
156     finally:
157       conn.close()
158
159   def _SendNotification(self, test, arg, sockname):
160     """Sends a notification to the client.
161
162     @type test: string
163     @param test: Test name
164     @param arg: Test argument (depends on test)
165     @type sockname: string
166     @param sockname: Socket path
167
168     """
169     self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
170
171   def _Notify(self, prereq, test, arg):
172     """Notifies the client of a test.
173
174     @type prereq: bool
175     @param prereq: Whether this is a prereq-phase test
176     @type test: string
177     @param test: Test name
178     @param arg: Test argument (depends on test)
179
180     """
181     if prereq:
182       errcls = errors.OpPrereqError
183     else:
184       errcls = errors.OpExecError
185
186     return self._NotifyUsingSocket(compat.partial(self._SendNotification,
187                                                   test, arg),
188                                    errcls)
189
190   def CheckArguments(self):
191     self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
192     self.expandnames_calls = 0
193
194   def ExpandNames(self):
195     checkargs_calls = getattr(self, "checkargs_calls", 0)
196     if checkargs_calls < 1:
197       raise errors.ProgrammerError("CheckArguments was not called")
198
199     self.expandnames_calls += 1
200
201     if self.op.notify_waitlock:
202       self._Notify(True, constants.JQT_EXPANDNAMES, None)
203
204     self.LogInfo("Expanding names")
205
206     # Get lock on master node (just to get a lock, not for a particular reason)
207     self.needed_locks = {
208       locking.LEVEL_NODE: self.cfg.GetMasterNode(),
209       }
210
211   def Exec(self, feedback_fn):
212     if self.expandnames_calls < 1:
213       raise errors.ProgrammerError("ExpandNames was not called")
214
215     if self.op.notify_exec:
216       self._Notify(False, constants.JQT_EXEC, None)
217
218     self.LogInfo("Executing")
219
220     if self.op.log_messages:
221       self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
222       for idx, msg in enumerate(self.op.log_messages):
223         self.LogInfo("Sending log message %s", idx + 1)
224         feedback_fn(constants.JQT_MSGPREFIX + msg)
225         # Report how many test messages have been sent
226         self._Notify(False, constants.JQT_LOGMSG, idx + 1)
227
228     if self.op.fail:
229       raise errors.OpExecError("Opcode failure was requested")
230
231     return True
232
233
234 class LUTestAllocator(NoHooksLU):
235   """Run allocator tests.
236
237   This LU runs the allocator tests
238
239   """
240   def CheckPrereq(self):
241     """Check prerequisites.
242
243     This checks the opcode parameters depending on the director and mode test.
244
245     """
246     if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
247                         constants.IALLOCATOR_MODE_MULTI_ALLOC):
248       for attr in ["memory", "disks", "disk_template",
249                    "os", "tags", "nics", "vcpus"]:
250         if not hasattr(self.op, attr):
251           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
252                                      attr, errors.ECODE_INVAL)
253       (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name)
254       if iname is not None:
255         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
256                                    iname, errors.ECODE_EXISTS)
257       if not isinstance(self.op.nics, list):
258         raise errors.OpPrereqError("Invalid parameter 'nics'",
259                                    errors.ECODE_INVAL)
260       if not isinstance(self.op.disks, list):
261         raise errors.OpPrereqError("Invalid parameter 'disks'",
262                                    errors.ECODE_INVAL)
263       for row in self.op.disks:
264         if (not isinstance(row, dict) or
265             constants.IDISK_SIZE not in row or
266             not isinstance(row[constants.IDISK_SIZE], int) or
267             constants.IDISK_MODE not in row or
268             row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
269           raise errors.OpPrereqError("Invalid contents of the 'disks'"
270                                      " parameter", errors.ECODE_INVAL)
271       if self.op.hypervisor is None:
272         self.op.hypervisor = self.cfg.GetHypervisorType()
273     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
274       (fuuid, fname) = ExpandInstanceUuidAndName(self.cfg, None, self.op.name)
275       self.op.name = fname
276       self.relocate_from_node_uuids = \
277           list(self.cfg.GetInstanceInfo(fuuid).secondary_nodes)
278     elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
279                           constants.IALLOCATOR_MODE_NODE_EVAC):
280       if not self.op.instances:
281         raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
282       (_, self.op.instances) = GetWantedInstances(self, self.op.instances)
283     else:
284       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
285                                  self.op.mode, errors.ECODE_INVAL)
286
287     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
288       if self.op.iallocator is None:
289         raise errors.OpPrereqError("Missing allocator name",
290                                    errors.ECODE_INVAL)
291     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
292       raise errors.OpPrereqError("Wrong allocator test '%s'" %
293                                  self.op.direction, errors.ECODE_INVAL)
294
295   def Exec(self, feedback_fn):
296     """Run the allocator test.
297
298     """
299     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
300       req = iallocator.IAReqInstanceAlloc(name=self.op.name,
301                                           memory=self.op.memory,
302                                           disks=self.op.disks,
303                                           disk_template=self.op.disk_template,
304                                           os=self.op.os,
305                                           tags=self.op.tags,
306                                           nics=self.op.nics,
307                                           vcpus=self.op.vcpus,
308                                           spindle_use=self.op.spindle_use,
309                                           hypervisor=self.op.hypervisor,
310                                           node_whitelist=None)
311     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
312       req = iallocator.IAReqRelocate(
313             inst_uuid=self.inst_uuid,
314             relocate_from_node_uuids=list(self.relocate_from_node_uuids))
315     elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
316       req = iallocator.IAReqGroupChange(instances=self.op.instances,
317                                         target_groups=self.op.target_groups)
318     elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
319       req = iallocator.IAReqNodeEvac(instances=self.op.instances,
320                                      evac_mode=self.op.evac_mode)
321     elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
322       disk_template = self.op.disk_template
323       insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
324                                              memory=self.op.memory,
325                                              disks=self.op.disks,
326                                              disk_template=disk_template,
327                                              os=self.op.os,
328                                              tags=self.op.tags,
329                                              nics=self.op.nics,
330                                              vcpus=self.op.vcpus,
331                                              spindle_use=self.op.spindle_use,
332                                              hypervisor=self.op.hypervisor)
333                for idx in range(self.op.count)]
334       req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
335     else:
336       raise errors.ProgrammerError("Uncatched mode %s in"
337                                    " LUTestAllocator.Exec", self.op.mode)
338
339     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
340     if self.op.direction == constants.IALLOCATOR_DIR_IN:
341       result = ial.in_text
342     else:
343       ial.Run(self.op.iallocator, validate=False)
344       result = ial.out_text
345     return result