Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / test.py @ 9dc47292

History | View | Annotate | Download (14.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Test logical units."""
23

    
24
import logging
25
import shutil
26
import socket
27
import tempfile
28
import time
29

    
30
from ganeti import compat
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import locking
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36
from ganeti.cmdlib.base import NoHooksLU
37
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \
38
  GetWantedInstances
39

    
40

    
41
class TestSocketWrapper(object):
42
  """ Utility class that opens a domain socket and cleans up as needed.
43

44
  """
45
  def __init__(self):
46
    """ Constructor cleaning up variables to be used.
47

48
    """
49
    self.tmpdir = None
50
    self.sock = None
51

    
52
  def Create(self, max_connections=1):
53
    """ Creates a bound and ready socket, cleaning up in case of failure.
54

55
    @type max_connections: int
56
    @param max_connections: The number of max connections allowed for the
57
                            socket.
58

59
    @rtype: tuple of socket, string
60
    @return: The socket object and the path to reach it with.
61

62
    """
63
    # Using a temporary directory as there's no easy way to create temporary
64
    # sockets without writing a custom loop around tempfile.mktemp and
65
    # socket.bind
66
    self.tmpdir = tempfile.mkdtemp()
67
    try:
68
      tmpsock = utils.PathJoin(self.tmpdir, "sock")
69
      logging.debug("Creating temporary socket at %s", tmpsock)
70
      self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
71
      try:
72
        self.sock.bind(tmpsock)
73
        self.sock.listen(max_connections)
74
      except:
75
        self.sock.close()
76
        raise
77
    except:
78
      shutil.rmtree(self.tmpdir)
79
      raise
80

    
81
    return self.sock, tmpsock
82

    
83
  def Destroy(self):
84
    """ Destroys the socket and performs all necessary cleanup.
85

86
    """
87
    if self.tmpdir is None or self.sock is None:
88
      raise Exception("A socket must be created successfully before attempting "
89
                      "its destruction")
90

    
91
    try:
92
      self.sock.close()
93
    finally:
94
      shutil.rmtree(self.tmpdir)
95

    
96

    
97
class LUTestDelay(NoHooksLU):
98
  """Sleep for a specified amount of time.
99

100
  This LU sleeps on the master and/or nodes for a specified amount of
101
  time.
102

103
  """
104
  REQ_BGL = False
105

    
106
  def ExpandNames(self):
107
    """Expand names and set required locks.
108

109
    This expands the node list, if any.
110

111
    """
112

    
113
    if self.op.duration <= 0:
114
      raise errors.OpPrereqError("Duration must be greater than zero")
115

    
116
    self.op.on_node_uuids = []
117
    if self.op.on_nodes:
118
      # _GetWantedNodes can be used here, but is not always appropriate to use
119
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
120
      # more information.
121
      (self.op.on_node_uuids, self.op.on_nodes) = \
122
        GetWantedNodes(self, self.op.on_nodes)
123

    
124
    master_uuid = self.cfg.GetMasterNode()
125
    if self.op.on_master and master_uuid not in self.op.on_node_uuids:
126
      self.op.on_node_uuids.append(master_uuid)
127

    
128
    self.needed_locks = {}
129
    self.needed_locks[locking.LEVEL_NODE] = self.op.on_node_uuids
130

    
131
  def _InterruptibleDelay(self):
132
    """Delays but provides the mechanisms necessary to interrupt the delay as
133
    needed.
134

135
    """
136
    socket_wrapper = TestSocketWrapper()
137
    sock, path = socket_wrapper.Create()
138

    
139
    self.Log(constants.ELOG_DELAY_TEST, (path,))
140

    
141
    try:
142
      sock.settimeout(self.op.duration)
143
      start = time.time()
144
      (conn, _) = sock.accept()
145
    except socket.timeout, _:
146
      # If we timed out, all is well
147
      return False
148
    finally:
149
      # Destroys the original socket, but the new connection is still usable
150
      socket_wrapper.Destroy()
151

    
152
    try:
153
      # Change to remaining time
154
      time_to_go = self.op.duration - (time.time() - start)
155
      self.Log(constants.ELOG_MESSAGE,
156
               "Received connection, time to go is %d" % time_to_go)
157
      if time_to_go < 0:
158
        time_to_go = 0
159
      # pylint: disable=E1101
160
      # Instance of '_socketobject' has no ... member
161
      conn.settimeout(time_to_go)
162
      conn.recv(1)
163
      # pylint: enable=E1101
164
    except socket.timeout, _:
165
      # A second timeout can occur if no data is sent
166
      return False
167
    finally:
168
      conn.close()
169

    
170
    self.Log(constants.ELOG_MESSAGE,
171
             "Interrupted, time spent waiting: %d" % (time.time() - start))
172

    
173
    # Reaching this point means we were interrupted
174
    return True
175

    
176
  def _UninterruptibleDelay(self):
177
    """Delays without allowing interruptions.
178

179
    """
180
    if self.op.on_node_uuids:
181
      result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration)
182
      for node_uuid, node_result in result.items():
183
        node_result.Raise("Failure during rpc call to node %s" %
184
                          self.cfg.GetNodeName(node_uuid))
185
    else:
186
      if not utils.TestDelay(self.op.duration)[0]:
187
        raise errors.OpExecError("Error during master delay test")
188

    
189
  def _TestDelay(self):
190
    """Do the actual sleep.
191

192
    @rtype: bool
193
    @return: Whether the delay was interrupted
194

195
    """
196
    if self.op.interruptible:
197
      return self._InterruptibleDelay()
198
    else:
199
      self._UninterruptibleDelay()
200
      return False
201

    
202
  def Exec(self, feedback_fn):
203
    """Execute the test delay opcode, with the wanted repetitions.
204

205
    """
206
    if self.op.repeat == 0:
207
      i = self._TestDelay()
208
    else:
209
      top_value = self.op.repeat - 1
210
      for i in range(self.op.repeat):
211
        self.LogInfo("Test delay iteration %d/%d", i, top_value)
212
        # Break in case of interruption
213
        if self._TestDelay():
214
          break
215

    
216

    
217
class LUTestJqueue(NoHooksLU):
218
  """Utility LU to test some aspects of the job queue.
219

220
  """
221
  REQ_BGL = False
222

    
223
  # Must be lower than default timeout for WaitForJobChange to see whether it
224
  # notices changed jobs
225
  _CLIENT_CONNECT_TIMEOUT = 20.0
226
  _CLIENT_CONFIRM_TIMEOUT = 60.0
227

    
228
  @classmethod
229
  def _NotifyUsingSocket(cls, cb, errcls):
230
    """Opens a Unix socket and waits for another program to connect.
231

232
    @type cb: callable
233
    @param cb: Callback to send socket name to client
234
    @type errcls: class
235
    @param errcls: Exception class to use for errors
236

237
    """
238

    
239
    # Using a temporary directory as there's no easy way to create temporary
240
    # sockets without writing a custom loop around tempfile.mktemp and
241
    # socket.bind
242

    
243
    socket_wrapper = TestSocketWrapper()
244
    sock, path = socket_wrapper.Create()
245

    
246
    cb(path)
247

    
248
    try:
249
      sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
250
      (conn, _) = sock.accept()
251
    except socket.error, err:
252
      raise errcls("Client didn't connect in time (%s)" % err)
253
    finally:
254
      socket_wrapper.Destroy()
255

    
256
    # Wait for client to close
257
    try:
258
      try:
259
        # pylint: disable=E1101
260
        # Instance of '_socketobject' has no ... member
261
        conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
262
        conn.recv(1)
263
      except socket.error, err:
264
        raise errcls("Client failed to confirm notification (%s)" % err)
265
    finally:
266
      conn.close()
267

    
268
  def _SendNotification(self, test, arg, sockname):
269
    """Sends a notification to the client.
270

271
    @type test: string
272
    @param test: Test name
273
    @param arg: Test argument (depends on test)
274
    @type sockname: string
275
    @param sockname: Socket path
276

277
    """
278
    self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
279

    
280
  def _Notify(self, prereq, test, arg):
281
    """Notifies the client of a test.
282

283
    @type prereq: bool
284
    @param prereq: Whether this is a prereq-phase test
285
    @type test: string
286
    @param test: Test name
287
    @param arg: Test argument (depends on test)
288

289
    """
290
    if prereq:
291
      errcls = errors.OpPrereqError
292
    else:
293
      errcls = errors.OpExecError
294

    
295
    return self._NotifyUsingSocket(compat.partial(self._SendNotification,
296
                                                  test, arg),
297
                                   errcls)
298

    
299
  def CheckArguments(self):
300
    self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
301
    self.expandnames_calls = 0
302

    
303
  def ExpandNames(self):
304
    checkargs_calls = getattr(self, "checkargs_calls", 0)
305
    if checkargs_calls < 1:
306
      raise errors.ProgrammerError("CheckArguments was not called")
307

    
308
    self.expandnames_calls += 1
309

    
310
    if self.op.notify_waitlock:
311
      self._Notify(True, constants.JQT_EXPANDNAMES, None)
312

    
313
    self.LogInfo("Expanding names")
314

    
315
    # Get lock on master node (just to get a lock, not for a particular reason)
316
    self.needed_locks = {
317
      locking.LEVEL_NODE: self.cfg.GetMasterNode(),
318
      }
319

    
320
  def Exec(self, feedback_fn):
321
    if self.expandnames_calls < 1:
322
      raise errors.ProgrammerError("ExpandNames was not called")
323

    
324
    if self.op.notify_exec:
325
      self._Notify(False, constants.JQT_EXEC, None)
326

    
327
    self.LogInfo("Executing")
328

    
329
    if self.op.log_messages:
330
      self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
331
      for idx, msg in enumerate(self.op.log_messages):
332
        self.LogInfo("Sending log message %s", idx + 1)
333
        feedback_fn(constants.JQT_MSGPREFIX + msg)
334
        # Report how many test messages have been sent
335
        self._Notify(False, constants.JQT_LOGMSG, idx + 1)
336

    
337
    if self.op.fail:
338
      raise errors.OpExecError("Opcode failure was requested")
339

    
340
    return True
341

    
342

    
343
class LUTestAllocator(NoHooksLU):
344
  """Run allocator tests.
345

346
  This LU runs the allocator tests
347

348
  """
349
  def CheckPrereq(self):
350
    """Check prerequisites.
351

352
    This checks the opcode parameters depending on the director and mode test.
353

354
    """
355
    if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
356
                        constants.IALLOCATOR_MODE_MULTI_ALLOC):
357
      (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name)
358
      if iname is not None:
359
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
360
                                   iname, errors.ECODE_EXISTS)
361
      for row in self.op.disks:
362
        if (not isinstance(row, dict) or
363
            constants.IDISK_SIZE not in row or
364
            not isinstance(row[constants.IDISK_SIZE], int) or
365
            constants.IDISK_MODE not in row or
366
            row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
367
          raise errors.OpPrereqError("Invalid contents of the 'disks'"
368
                                     " parameter", errors.ECODE_INVAL)
369
      if self.op.hypervisor is None:
370
        self.op.hypervisor = self.cfg.GetHypervisorType()
371
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
372
      (self.inst_uuid, self.op.name) = ExpandInstanceUuidAndName(self.cfg, None,
373
                                                                 self.op.name)
374
      self.relocate_from_node_uuids = \
375
          list(self.cfg.GetInstanceInfo(self.inst_uuid).secondary_nodes)
376
    elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
377
                          constants.IALLOCATOR_MODE_NODE_EVAC):
378
      if not self.op.instances:
379
        raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
380
      (_, self.op.instances) = GetWantedInstances(self, self.op.instances)
381
    else:
382
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
383
                                 self.op.mode, errors.ECODE_INVAL)
384

    
385
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
386
      if self.op.iallocator is None:
387
        raise errors.OpPrereqError("Missing allocator name",
388
                                   errors.ECODE_INVAL)
389

    
390
  def Exec(self, feedback_fn):
391
    """Run the allocator test.
392

393
    """
394
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
395
      req = iallocator.IAReqInstanceAlloc(name=self.op.name,
396
                                          memory=self.op.memory,
397
                                          disks=self.op.disks,
398
                                          disk_template=self.op.disk_template,
399
                                          os=self.op.os,
400
                                          tags=self.op.tags,
401
                                          nics=self.op.nics,
402
                                          vcpus=self.op.vcpus,
403
                                          spindle_use=self.op.spindle_use,
404
                                          hypervisor=self.op.hypervisor,
405
                                          node_whitelist=None)
406
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
407
      req = iallocator.IAReqRelocate(
408
            inst_uuid=self.inst_uuid,
409
            relocate_from_node_uuids=list(self.relocate_from_node_uuids))
410
    elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
411
      req = iallocator.IAReqGroupChange(instances=self.op.instances,
412
                                        target_groups=self.op.target_groups)
413
    elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
414
      req = iallocator.IAReqNodeEvac(instances=self.op.instances,
415
                                     evac_mode=self.op.evac_mode)
416
    elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
417
      disk_template = self.op.disk_template
418
      insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
419
                                             memory=self.op.memory,
420
                                             disks=self.op.disks,
421
                                             disk_template=disk_template,
422
                                             os=self.op.os,
423
                                             tags=self.op.tags,
424
                                             nics=self.op.nics,
425
                                             vcpus=self.op.vcpus,
426
                                             spindle_use=self.op.spindle_use,
427
                                             hypervisor=self.op.hypervisor,
428
                                             node_whitelist=None)
429
               for idx in range(self.op.count)]
430
      req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
431
    else:
432
      raise errors.ProgrammerError("Uncatched mode %s in"
433
                                   " LUTestAllocator.Exec", self.op.mode)
434

    
435
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
436
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
437
      result = ial.in_text
438
    else:
439
      ial.Run(self.op.iallocator, validate=False)
440
      result = ial.out_text
441
    return result