Add correct locking of master node to gnt-debug delay
[ganeti-local] / lib / cmdlib / test.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Test logical units."""
23
24 import logging
25 import shutil
26 import socket
27 import tempfile
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import locking
33 from ganeti import utils
34 from ganeti.masterd import iallocator
35 from ganeti.cmdlib.base import NoHooksLU
36 from ganeti.cmdlib.common import ExpandInstanceName, GetWantedNodes, \
37   GetWantedInstances
38
39
40 class LUTestDelay(NoHooksLU):
41   """Sleep for a specified amount of time.
42
43   This LU sleeps on the master and/or nodes for a specified amount of
44   time.
45
46   """
47   REQ_BGL = False
48
49   def ExpandNames(self):
50     """Expand names and set required locks.
51
52     This expands the node list, if any.
53
54     """
55     self.needed_locks = {}
56
57     if self.op.on_nodes or self.op.on_master:
58       self.needed_locks[locking.LEVEL_NODE] = []
59
60     if self.op.on_nodes:
61       # _GetWantedNodes can be used here, but is not always appropriate to use
62       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
63       # more information.
64       self.op.on_nodes = GetWantedNodes(self, self.op.on_nodes)
65       self.needed_locks[locking.LEVEL_NODE].extend(self.op.on_nodes)
66
67     if self.op.on_master:
68       # The node lock should be acquired for the master as well.
69       self.needed_locks[locking.LEVEL_NODE].append(self.cfg.GetMasterNode())
70
71   def _TestDelay(self):
72     """Do the actual sleep.
73
74     """
75     if self.op.on_master:
76       if not utils.TestDelay(self.op.duration):
77         raise errors.OpExecError("Error during master delay test")
78     if self.op.on_nodes:
79       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
80       for node, node_result in result.items():
81         node_result.Raise("Failure during rpc call to node %s" % node)
82
83   def Exec(self, feedback_fn):
84     """Execute the test delay opcode, with the wanted repetitions.
85
86     """
87     if self.op.repeat == 0:
88       self._TestDelay()
89     else:
90       top_value = self.op.repeat - 1
91       for i in range(self.op.repeat):
92         self.LogInfo("Test delay iteration %d/%d", i, top_value)
93         self._TestDelay()
94
95
96 class LUTestJqueue(NoHooksLU):
97   """Utility LU to test some aspects of the job queue.
98
99   """
100   REQ_BGL = False
101
102   # Must be lower than default timeout for WaitForJobChange to see whether it
103   # notices changed jobs
104   _CLIENT_CONNECT_TIMEOUT = 20.0
105   _CLIENT_CONFIRM_TIMEOUT = 60.0
106
107   @classmethod
108   def _NotifyUsingSocket(cls, cb, errcls):
109     """Opens a Unix socket and waits for another program to connect.
110
111     @type cb: callable
112     @param cb: Callback to send socket name to client
113     @type errcls: class
114     @param errcls: Exception class to use for errors
115
116     """
117     # Using a temporary directory as there's no easy way to create temporary
118     # sockets without writing a custom loop around tempfile.mktemp and
119     # socket.bind
120     tmpdir = tempfile.mkdtemp()
121     try:
122       tmpsock = utils.PathJoin(tmpdir, "sock")
123
124       logging.debug("Creating temporary socket at %s", tmpsock)
125       sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
126       try:
127         sock.bind(tmpsock)
128         sock.listen(1)
129
130         # Send details to client
131         cb(tmpsock)
132
133         # Wait for client to connect before continuing
134         sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
135         try:
136           (conn, _) = sock.accept()
137         except socket.error, err:
138           raise errcls("Client didn't connect in time (%s)" % err)
139       finally:
140         sock.close()
141     finally:
142       # Remove as soon as client is connected
143       shutil.rmtree(tmpdir)
144
145     # Wait for client to close
146     try:
147       try:
148         # pylint: disable=E1101
149         # Instance of '_socketobject' has no ... member
150         conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
151         conn.recv(1)
152       except socket.error, err:
153         raise errcls("Client failed to confirm notification (%s)" % err)
154     finally:
155       conn.close()
156
157   def _SendNotification(self, test, arg, sockname):
158     """Sends a notification to the client.
159
160     @type test: string
161     @param test: Test name
162     @param arg: Test argument (depends on test)
163     @type sockname: string
164     @param sockname: Socket path
165
166     """
167     self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
168
169   def _Notify(self, prereq, test, arg):
170     """Notifies the client of a test.
171
172     @type prereq: bool
173     @param prereq: Whether this is a prereq-phase test
174     @type test: string
175     @param test: Test name
176     @param arg: Test argument (depends on test)
177
178     """
179     if prereq:
180       errcls = errors.OpPrereqError
181     else:
182       errcls = errors.OpExecError
183
184     return self._NotifyUsingSocket(compat.partial(self._SendNotification,
185                                                   test, arg),
186                                    errcls)
187
188   def CheckArguments(self):
189     self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
190     self.expandnames_calls = 0
191
192   def ExpandNames(self):
193     checkargs_calls = getattr(self, "checkargs_calls", 0)
194     if checkargs_calls < 1:
195       raise errors.ProgrammerError("CheckArguments was not called")
196
197     self.expandnames_calls += 1
198
199     if self.op.notify_waitlock:
200       self._Notify(True, constants.JQT_EXPANDNAMES, None)
201
202     self.LogInfo("Expanding names")
203
204     # Get lock on master node (just to get a lock, not for a particular reason)
205     self.needed_locks = {
206       locking.LEVEL_NODE: self.cfg.GetMasterNode(),
207       }
208
209   def Exec(self, feedback_fn):
210     if self.expandnames_calls < 1:
211       raise errors.ProgrammerError("ExpandNames was not called")
212
213     if self.op.notify_exec:
214       self._Notify(False, constants.JQT_EXEC, None)
215
216     self.LogInfo("Executing")
217
218     if self.op.log_messages:
219       self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
220       for idx, msg in enumerate(self.op.log_messages):
221         self.LogInfo("Sending log message %s", idx + 1)
222         feedback_fn(constants.JQT_MSGPREFIX + msg)
223         # Report how many test messages have been sent
224         self._Notify(False, constants.JQT_LOGMSG, idx + 1)
225
226     if self.op.fail:
227       raise errors.OpExecError("Opcode failure was requested")
228
229     return True
230
231
232 class LUTestAllocator(NoHooksLU):
233   """Run allocator tests.
234
235   This LU runs the allocator tests
236
237   """
238   def CheckPrereq(self):
239     """Check prerequisites.
240
241     This checks the opcode parameters depending on the director and mode test.
242
243     """
244     if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
245                         constants.IALLOCATOR_MODE_MULTI_ALLOC):
246       for attr in ["memory", "disks", "disk_template",
247                    "os", "tags", "nics", "vcpus"]:
248         if not hasattr(self.op, attr):
249           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
250                                      attr, errors.ECODE_INVAL)
251       iname = self.cfg.ExpandInstanceName(self.op.name)
252       if iname is not None:
253         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
254                                    iname, errors.ECODE_EXISTS)
255       if not isinstance(self.op.nics, list):
256         raise errors.OpPrereqError("Invalid parameter 'nics'",
257                                    errors.ECODE_INVAL)
258       if not isinstance(self.op.disks, list):
259         raise errors.OpPrereqError("Invalid parameter 'disks'",
260                                    errors.ECODE_INVAL)
261       for row in self.op.disks:
262         if (not isinstance(row, dict) or
263             constants.IDISK_SIZE not in row or
264             not isinstance(row[constants.IDISK_SIZE], int) or
265             constants.IDISK_MODE not in row or
266             row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
267           raise errors.OpPrereqError("Invalid contents of the 'disks'"
268                                      " parameter", errors.ECODE_INVAL)
269       if self.op.hypervisor is None:
270         self.op.hypervisor = self.cfg.GetHypervisorType()
271     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
272       fname = ExpandInstanceName(self.cfg, self.op.name)
273       self.op.name = fname
274       self.relocate_from = \
275           list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
276     elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
277                           constants.IALLOCATOR_MODE_NODE_EVAC):
278       if not self.op.instances:
279         raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
280       self.op.instances = GetWantedInstances(self, self.op.instances)
281     else:
282       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
283                                  self.op.mode, errors.ECODE_INVAL)
284
285     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
286       if self.op.iallocator is None:
287         raise errors.OpPrereqError("Missing allocator name",
288                                    errors.ECODE_INVAL)
289     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
290       raise errors.OpPrereqError("Wrong allocator test '%s'" %
291                                  self.op.direction, errors.ECODE_INVAL)
292
293   def Exec(self, feedback_fn):
294     """Run the allocator test.
295
296     """
297     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
298       req = iallocator.IAReqInstanceAlloc(name=self.op.name,
299                                           memory=self.op.memory,
300                                           disks=self.op.disks,
301                                           disk_template=self.op.disk_template,
302                                           os=self.op.os,
303                                           tags=self.op.tags,
304                                           nics=self.op.nics,
305                                           vcpus=self.op.vcpus,
306                                           spindle_use=self.op.spindle_use,
307                                           hypervisor=self.op.hypervisor,
308                                           node_whitelist=None)
309     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
310       req = iallocator.IAReqRelocate(name=self.op.name,
311                                      relocate_from=list(self.relocate_from))
312     elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
313       req = iallocator.IAReqGroupChange(instances=self.op.instances,
314                                         target_groups=self.op.target_groups)
315     elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
316       req = iallocator.IAReqNodeEvac(instances=self.op.instances,
317                                      evac_mode=self.op.evac_mode)
318     elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
319       disk_template = self.op.disk_template
320       insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
321                                              memory=self.op.memory,
322                                              disks=self.op.disks,
323                                              disk_template=disk_template,
324                                              os=self.op.os,
325                                              tags=self.op.tags,
326                                              nics=self.op.nics,
327                                              vcpus=self.op.vcpus,
328                                              spindle_use=self.op.spindle_use,
329                                              hypervisor=self.op.hypervisor)
330                for idx in range(self.op.count)]
331       req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
332     else:
333       raise errors.ProgrammerError("Uncatched mode %s in"
334                                    " LUTestAllocator.Exec", self.op.mode)
335
336     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
337     if self.op.direction == constants.IALLOCATOR_DIR_IN:
338       result = ial.in_text
339     else:
340       ial.Run(self.op.iallocator, validate=False)
341       result = ial.out_text
342     return result