Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / test.py @ 054a9d17

History | View | Annotate | Download (12.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Test logical units."""
23

    
24
import logging
25
import shutil
26
import socket
27
import tempfile
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import locking
33
from ganeti import utils
34
from ganeti.masterd import iallocator
35
from ganeti.cmdlib.base import NoHooksLU
36
from ganeti.cmdlib.common import ExpandInstanceName, GetWantedNodes, \
37
  GetWantedInstances
38

    
39

    
40
class LUTestDelay(NoHooksLU):
41
  """Sleep for a specified amount of time.
42

43
  This LU sleeps on the master and/or nodes for a specified amount of
44
  time.
45

46
  """
47
  REQ_BGL = False
48

    
49
  def ExpandNames(self):
50
    """Expand names and set required locks.
51

52
    This expands the node list, if any.
53

54
    """
55
    self.needed_locks = {}
56

    
57
    if self.op.on_nodes or self.op.on_master:
58
      self.needed_locks[locking.LEVEL_NODE] = []
59

    
60
    if self.op.on_nodes:
61
      # _GetWantedNodes can be used here, but is not always appropriate to use
62
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
63
      # more information.
64
      self.op.on_nodes = GetWantedNodes(self, self.op.on_nodes)
65
      self.needed_locks[locking.LEVEL_NODE].extend(self.op.on_nodes)
66

    
67
    if self.op.on_master:
68
      # The node lock should be acquired for the master as well.
69
      self.needed_locks[locking.LEVEL_NODE].append(self.cfg.GetMasterNode())
70

    
71
  def _TestDelay(self):
72
    """Do the actual sleep.
73

74
    """
75
    if self.op.on_master:
76
      if not utils.TestDelay(self.op.duration):
77
        raise errors.OpExecError("Error during master delay test")
78
    if self.op.on_nodes:
79
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
80
      for node, node_result in result.items():
81
        node_result.Raise("Failure during rpc call to node %s" % node)
82

    
83
  def Exec(self, feedback_fn):
84
    """Execute the test delay opcode, with the wanted repetitions.
85

86
    """
87
    if self.op.repeat == 0:
88
      self._TestDelay()
89
    else:
90
      top_value = self.op.repeat - 1
91
      for i in range(self.op.repeat):
92
        self.LogInfo("Test delay iteration %d/%d", i, top_value)
93
        self._TestDelay()
94

    
95

    
96
class LUTestJqueue(NoHooksLU):
97
  """Utility LU to test some aspects of the job queue.
98

99
  """
100
  REQ_BGL = False
101

    
102
  # Must be lower than default timeout for WaitForJobChange to see whether it
103
  # notices changed jobs
104
  _CLIENT_CONNECT_TIMEOUT = 20.0
105
  _CLIENT_CONFIRM_TIMEOUT = 60.0
106

    
107
  @classmethod
108
  def _NotifyUsingSocket(cls, cb, errcls):
109
    """Opens a Unix socket and waits for another program to connect.
110

111
    @type cb: callable
112
    @param cb: Callback to send socket name to client
113
    @type errcls: class
114
    @param errcls: Exception class to use for errors
115

116
    """
117
    # Using a temporary directory as there's no easy way to create temporary
118
    # sockets without writing a custom loop around tempfile.mktemp and
119
    # socket.bind
120
    tmpdir = tempfile.mkdtemp()
121
    try:
122
      tmpsock = utils.PathJoin(tmpdir, "sock")
123

    
124
      logging.debug("Creating temporary socket at %s", tmpsock)
125
      sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
126
      try:
127
        sock.bind(tmpsock)
128
        sock.listen(1)
129

    
130
        # Send details to client
131
        cb(tmpsock)
132

    
133
        # Wait for client to connect before continuing
134
        sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
135
        try:
136
          (conn, _) = sock.accept()
137
        except socket.error, err:
138
          raise errcls("Client didn't connect in time (%s)" % err)
139
      finally:
140
        sock.close()
141
    finally:
142
      # Remove as soon as client is connected
143
      shutil.rmtree(tmpdir)
144

    
145
    # Wait for client to close
146
    try:
147
      try:
148
        # pylint: disable=E1101
149
        # Instance of '_socketobject' has no ... member
150
        conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
151
        conn.recv(1)
152
      except socket.error, err:
153
        raise errcls("Client failed to confirm notification (%s)" % err)
154
    finally:
155
      conn.close()
156

    
157
  def _SendNotification(self, test, arg, sockname):
158
    """Sends a notification to the client.
159

160
    @type test: string
161
    @param test: Test name
162
    @param arg: Test argument (depends on test)
163
    @type sockname: string
164
    @param sockname: Socket path
165

166
    """
167
    self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
168

    
169
  def _Notify(self, prereq, test, arg):
170
    """Notifies the client of a test.
171

172
    @type prereq: bool
173
    @param prereq: Whether this is a prereq-phase test
174
    @type test: string
175
    @param test: Test name
176
    @param arg: Test argument (depends on test)
177

178
    """
179
    if prereq:
180
      errcls = errors.OpPrereqError
181
    else:
182
      errcls = errors.OpExecError
183

    
184
    return self._NotifyUsingSocket(compat.partial(self._SendNotification,
185
                                                  test, arg),
186
                                   errcls)
187

    
188
  def CheckArguments(self):
189
    self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
190
    self.expandnames_calls = 0
191

    
192
  def ExpandNames(self):
193
    checkargs_calls = getattr(self, "checkargs_calls", 0)
194
    if checkargs_calls < 1:
195
      raise errors.ProgrammerError("CheckArguments was not called")
196

    
197
    self.expandnames_calls += 1
198

    
199
    if self.op.notify_waitlock:
200
      self._Notify(True, constants.JQT_EXPANDNAMES, None)
201

    
202
    self.LogInfo("Expanding names")
203

    
204
    # Get lock on master node (just to get a lock, not for a particular reason)
205
    self.needed_locks = {
206
      locking.LEVEL_NODE: self.cfg.GetMasterNode(),
207
      }
208

    
209
  def Exec(self, feedback_fn):
210
    if self.expandnames_calls < 1:
211
      raise errors.ProgrammerError("ExpandNames was not called")
212

    
213
    if self.op.notify_exec:
214
      self._Notify(False, constants.JQT_EXEC, None)
215

    
216
    self.LogInfo("Executing")
217

    
218
    if self.op.log_messages:
219
      self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
220
      for idx, msg in enumerate(self.op.log_messages):
221
        self.LogInfo("Sending log message %s", idx + 1)
222
        feedback_fn(constants.JQT_MSGPREFIX + msg)
223
        # Report how many test messages have been sent
224
        self._Notify(False, constants.JQT_LOGMSG, idx + 1)
225

    
226
    if self.op.fail:
227
      raise errors.OpExecError("Opcode failure was requested")
228

    
229
    return True
230

    
231

    
232
class LUTestAllocator(NoHooksLU):
233
  """Run allocator tests.
234

235
  This LU runs the allocator tests
236

237
  """
238
  def CheckPrereq(self):
239
    """Check prerequisites.
240

241
    This checks the opcode parameters depending on the director and mode test.
242

243
    """
244
    if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
245
                        constants.IALLOCATOR_MODE_MULTI_ALLOC):
246
      for attr in ["memory", "disks", "disk_template",
247
                   "os", "tags", "nics", "vcpus"]:
248
        if not hasattr(self.op, attr):
249
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
250
                                     attr, errors.ECODE_INVAL)
251
      iname = self.cfg.ExpandInstanceName(self.op.name)
252
      if iname is not None:
253
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
254
                                   iname, errors.ECODE_EXISTS)
255
      if not isinstance(self.op.nics, list):
256
        raise errors.OpPrereqError("Invalid parameter 'nics'",
257
                                   errors.ECODE_INVAL)
258
      if not isinstance(self.op.disks, list):
259
        raise errors.OpPrereqError("Invalid parameter 'disks'",
260
                                   errors.ECODE_INVAL)
261
      for row in self.op.disks:
262
        if (not isinstance(row, dict) or
263
            constants.IDISK_SIZE not in row or
264
            not isinstance(row[constants.IDISK_SIZE], int) or
265
            constants.IDISK_MODE not in row or
266
            row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
267
          raise errors.OpPrereqError("Invalid contents of the 'disks'"
268
                                     " parameter", errors.ECODE_INVAL)
269
      if self.op.hypervisor is None:
270
        self.op.hypervisor = self.cfg.GetHypervisorType()
271
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
272
      fname = ExpandInstanceName(self.cfg, self.op.name)
273
      self.op.name = fname
274
      self.relocate_from = \
275
          list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
276
    elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
277
                          constants.IALLOCATOR_MODE_NODE_EVAC):
278
      if not self.op.instances:
279
        raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
280
      self.op.instances = GetWantedInstances(self, self.op.instances)
281
    else:
282
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
283
                                 self.op.mode, errors.ECODE_INVAL)
284

    
285
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
286
      if self.op.iallocator is None:
287
        raise errors.OpPrereqError("Missing allocator name",
288
                                   errors.ECODE_INVAL)
289
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
290
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
291
                                 self.op.direction, errors.ECODE_INVAL)
292

    
293
  def Exec(self, feedback_fn):
294
    """Run the allocator test.
295

296
    """
297
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
298
      req = iallocator.IAReqInstanceAlloc(name=self.op.name,
299
                                          memory=self.op.memory,
300
                                          disks=self.op.disks,
301
                                          disk_template=self.op.disk_template,
302
                                          os=self.op.os,
303
                                          tags=self.op.tags,
304
                                          nics=self.op.nics,
305
                                          vcpus=self.op.vcpus,
306
                                          spindle_use=self.op.spindle_use,
307
                                          hypervisor=self.op.hypervisor,
308
                                          node_whitelist=None)
309
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
310
      req = iallocator.IAReqRelocate(name=self.op.name,
311
                                     relocate_from=list(self.relocate_from))
312
    elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
313
      req = iallocator.IAReqGroupChange(instances=self.op.instances,
314
                                        target_groups=self.op.target_groups)
315
    elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
316
      req = iallocator.IAReqNodeEvac(instances=self.op.instances,
317
                                     evac_mode=self.op.evac_mode)
318
    elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
319
      disk_template = self.op.disk_template
320
      insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
321
                                             memory=self.op.memory,
322
                                             disks=self.op.disks,
323
                                             disk_template=disk_template,
324
                                             os=self.op.os,
325
                                             tags=self.op.tags,
326
                                             nics=self.op.nics,
327
                                             vcpus=self.op.vcpus,
328
                                             spindle_use=self.op.spindle_use,
329
                                             hypervisor=self.op.hypervisor)
330
               for idx in range(self.op.count)]
331
      req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
332
    else:
333
      raise errors.ProgrammerError("Uncatched mode %s in"
334
                                   " LUTestAllocator.Exec", self.op.mode)
335

    
336
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
337
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
338
      result = ial.in_text
339
    else:
340
      ial.Run(self.op.iallocator, validate=False)
341
      result = ial.out_text
342
    return result