Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / test.py @ c346d0ac

History | View | Annotate | Download (12.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Test logical units."""
23

    
24
import logging
25
import shutil
26
import socket
27
import tempfile
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import locking
33
from ganeti import utils
34
from ganeti.masterd import iallocator
35
from ganeti.cmdlib.base import NoHooksLU
36
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \
37
  GetWantedInstances
38

    
39

    
40
class TestSocketWrapper(object):
41
  """ Utility class that opens a domain socket and cleans up as needed.
42

43
  """
44
  def __init__(self):
45
    """ Constructor cleaning up variables to be used.
46

47
    """
48
    self.tmpdir = None
49
    self.sock = None
50

    
51
  def Create(self, max_connections=1):
52
    """ Creates a bound and ready socket, cleaning up in case of failure.
53

54
    @type max_connections: int
55
    @param max_connections: The number of max connections allowed for the
56
                            socket.
57

58
    @rtype: tuple of socket, string
59
    @return: The socket object and the path to reach it with.
60

61
    """
62
    # Using a temporary directory as there's no easy way to create temporary
63
    # sockets without writing a custom loop around tempfile.mktemp and
64
    # socket.bind
65
    self.tmpdir = tempfile.mkdtemp()
66
    try:
67
      tmpsock = utils.PathJoin(self.tmpdir, "sock")
68
      logging.debug("Creating temporary socket at %s", tmpsock)
69
      self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
70
      try:
71
        self.sock.bind(tmpsock)
72
        self.sock.listen(max_connections)
73
      except:
74
        self.sock.close()
75
        raise
76
    except:
77
      shutil.rmtree(self.tmpdir)
78
      raise
79

    
80
    return self.sock, tmpsock
81

    
82
  def Destroy(self):
83
    """ Destroys the socket and performs all necessary cleanup.
84

85
    """
86
    if self.tmpdir is None or self.sock is None:
87
      raise Exception("A socket must be created successfully before attempting "
88
                      "its destruction")
89

    
90
    try:
91
      self.sock.close()
92
    finally:
93
      shutil.rmtree(self.tmpdir)
94

    
95

    
96
class LUTestDelay(NoHooksLU):
97
  """Sleep for a specified amount of time.
98

99
  This LU sleeps on the master and/or nodes for a specified amount of
100
  time.
101

102
  """
103
  REQ_BGL = False
104

    
105
  def ExpandNames(self):
106
    """Expand names and set required locks.
107

108
    This expands the node list, if any.
109

110
    """
111

    
112
    if self.op.duration <= 0:
113
      raise errors.OpPrereqError("Duration must be greater than zero")
114

    
115
    self.op.on_node_uuids = []
116
    if self.op.on_nodes:
117
      # _GetWantedNodes can be used here, but is not always appropriate to use
118
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
119
      # more information.
120
      (self.op.on_node_uuids, self.op.on_nodes) = \
121
        GetWantedNodes(self, self.op.on_nodes)
122

    
123
    master_uuid = self.cfg.GetMasterNode()
124
    if self.op.on_master and master_uuid not in self.op.on_node_uuids:
125
      self.op.on_node_uuids.append(master_uuid)
126

    
127
    self.needed_locks = {}
128
    self.needed_locks[locking.LEVEL_NODE] = self.op.on_node_uuids
129

    
130
  def _TestDelay(self):
131
    """Do the actual sleep.
132

133
    """
134
    if self.op.on_node_uuids:
135
      result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration)
136
      for node_uuid, node_result in result.items():
137
        node_result.Raise("Failure during rpc call to node %s" %
138
                          self.cfg.GetNodeName(node_uuid))
139
    else:
140
      if not utils.TestDelay(self.op.duration)[0]:
141
        raise errors.OpExecError("Error during master delay test")
142

    
143
  def Exec(self, feedback_fn):
144
    """Execute the test delay opcode, with the wanted repetitions.
145

146
    """
147
    if self.op.repeat == 0:
148
      self._TestDelay()
149
    else:
150
      top_value = self.op.repeat - 1
151
      for i in range(self.op.repeat):
152
        self.LogInfo("Test delay iteration %d/%d", i, top_value)
153
        self._TestDelay()
154

    
155

    
156
class LUTestJqueue(NoHooksLU):
157
  """Utility LU to test some aspects of the job queue.
158

159
  """
160
  REQ_BGL = False
161

    
162
  # Must be lower than default timeout for WaitForJobChange to see whether it
163
  # notices changed jobs
164
  _CLIENT_CONNECT_TIMEOUT = 20.0
165
  _CLIENT_CONFIRM_TIMEOUT = 60.0
166

    
167
  @classmethod
168
  def _NotifyUsingSocket(cls, cb, errcls):
169
    """Opens a Unix socket and waits for another program to connect.
170

171
    @type cb: callable
172
    @param cb: Callback to send socket name to client
173
    @type errcls: class
174
    @param errcls: Exception class to use for errors
175

176
    """
177

    
178
    # Using a temporary directory as there's no easy way to create temporary
179
    # sockets without writing a custom loop around tempfile.mktemp and
180
    # socket.bind
181

    
182
    socket_wrapper = TestSocketWrapper()
183
    sock, path = socket_wrapper.Create()
184

    
185
    cb(path)
186

    
187
    try:
188
      sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
189
      (conn, _) = sock.accept()
190
    except socket.error, err:
191
      raise errcls("Client didn't connect in time (%s)" % err)
192
    finally:
193
      socket_wrapper.Destroy()
194

    
195
    # Wait for client to close
196
    try:
197
      try:
198
        # pylint: disable=E1101
199
        # Instance of '_socketobject' has no ... member
200
        conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
201
        conn.recv(1)
202
      except socket.error, err:
203
        raise errcls("Client failed to confirm notification (%s)" % err)
204
    finally:
205
      conn.close()
206

    
207
  def _SendNotification(self, test, arg, sockname):
208
    """Sends a notification to the client.
209

210
    @type test: string
211
    @param test: Test name
212
    @param arg: Test argument (depends on test)
213
    @type sockname: string
214
    @param sockname: Socket path
215

216
    """
217
    self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
218

    
219
  def _Notify(self, prereq, test, arg):
220
    """Notifies the client of a test.
221

222
    @type prereq: bool
223
    @param prereq: Whether this is a prereq-phase test
224
    @type test: string
225
    @param test: Test name
226
    @param arg: Test argument (depends on test)
227

228
    """
229
    if prereq:
230
      errcls = errors.OpPrereqError
231
    else:
232
      errcls = errors.OpExecError
233

    
234
    return self._NotifyUsingSocket(compat.partial(self._SendNotification,
235
                                                  test, arg),
236
                                   errcls)
237

    
238
  def CheckArguments(self):
239
    self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
240
    self.expandnames_calls = 0
241

    
242
  def ExpandNames(self):
243
    checkargs_calls = getattr(self, "checkargs_calls", 0)
244
    if checkargs_calls < 1:
245
      raise errors.ProgrammerError("CheckArguments was not called")
246

    
247
    self.expandnames_calls += 1
248

    
249
    if self.op.notify_waitlock:
250
      self._Notify(True, constants.JQT_EXPANDNAMES, None)
251

    
252
    self.LogInfo("Expanding names")
253

    
254
    # Get lock on master node (just to get a lock, not for a particular reason)
255
    self.needed_locks = {
256
      locking.LEVEL_NODE: self.cfg.GetMasterNode(),
257
      }
258

    
259
  def Exec(self, feedback_fn):
260
    if self.expandnames_calls < 1:
261
      raise errors.ProgrammerError("ExpandNames was not called")
262

    
263
    if self.op.notify_exec:
264
      self._Notify(False, constants.JQT_EXEC, None)
265

    
266
    self.LogInfo("Executing")
267

    
268
    if self.op.log_messages:
269
      self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
270
      for idx, msg in enumerate(self.op.log_messages):
271
        self.LogInfo("Sending log message %s", idx + 1)
272
        feedback_fn(constants.JQT_MSGPREFIX + msg)
273
        # Report how many test messages have been sent
274
        self._Notify(False, constants.JQT_LOGMSG, idx + 1)
275

    
276
    if self.op.fail:
277
      raise errors.OpExecError("Opcode failure was requested")
278

    
279
    return True
280

    
281

    
282
class LUTestAllocator(NoHooksLU):
283
  """Run allocator tests.
284

285
  This LU runs the allocator tests
286

287
  """
288
  def CheckPrereq(self):
289
    """Check prerequisites.
290

291
    This checks the opcode parameters depending on the director and mode test.
292

293
    """
294
    if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
295
                        constants.IALLOCATOR_MODE_MULTI_ALLOC):
296
      (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name)
297
      if iname is not None:
298
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
299
                                   iname, errors.ECODE_EXISTS)
300
      for row in self.op.disks:
301
        if (not isinstance(row, dict) or
302
            constants.IDISK_SIZE not in row or
303
            not isinstance(row[constants.IDISK_SIZE], int) or
304
            constants.IDISK_MODE not in row or
305
            row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
306
          raise errors.OpPrereqError("Invalid contents of the 'disks'"
307
                                     " parameter", errors.ECODE_INVAL)
308
      if self.op.hypervisor is None:
309
        self.op.hypervisor = self.cfg.GetHypervisorType()
310
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
311
      (self.inst_uuid, self.op.name) = ExpandInstanceUuidAndName(self.cfg, None,
312
                                                                 self.op.name)
313
      self.relocate_from_node_uuids = \
314
          list(self.cfg.GetInstanceInfo(self.inst_uuid).secondary_nodes)
315
    elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
316
                          constants.IALLOCATOR_MODE_NODE_EVAC):
317
      if not self.op.instances:
318
        raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
319
      (_, self.op.instances) = GetWantedInstances(self, self.op.instances)
320
    else:
321
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
322
                                 self.op.mode, errors.ECODE_INVAL)
323

    
324
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
325
      if self.op.iallocator is None:
326
        raise errors.OpPrereqError("Missing allocator name",
327
                                   errors.ECODE_INVAL)
328

    
329
  def Exec(self, feedback_fn):
330
    """Run the allocator test.
331

332
    """
333
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
334
      req = iallocator.IAReqInstanceAlloc(name=self.op.name,
335
                                          memory=self.op.memory,
336
                                          disks=self.op.disks,
337
                                          disk_template=self.op.disk_template,
338
                                          os=self.op.os,
339
                                          tags=self.op.tags,
340
                                          nics=self.op.nics,
341
                                          vcpus=self.op.vcpus,
342
                                          spindle_use=self.op.spindle_use,
343
                                          hypervisor=self.op.hypervisor,
344
                                          node_whitelist=None)
345
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
346
      req = iallocator.IAReqRelocate(
347
            inst_uuid=self.inst_uuid,
348
            relocate_from_node_uuids=list(self.relocate_from_node_uuids))
349
    elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
350
      req = iallocator.IAReqGroupChange(instances=self.op.instances,
351
                                        target_groups=self.op.target_groups)
352
    elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
353
      req = iallocator.IAReqNodeEvac(instances=self.op.instances,
354
                                     evac_mode=self.op.evac_mode)
355
    elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
356
      disk_template = self.op.disk_template
357
      insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
358
                                             memory=self.op.memory,
359
                                             disks=self.op.disks,
360
                                             disk_template=disk_template,
361
                                             os=self.op.os,
362
                                             tags=self.op.tags,
363
                                             nics=self.op.nics,
364
                                             vcpus=self.op.vcpus,
365
                                             spindle_use=self.op.spindle_use,
366
                                             hypervisor=self.op.hypervisor,
367
                                             node_whitelist=None)
368
               for idx in range(self.op.count)]
369
      req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
370
    else:
371
      raise errors.ProgrammerError("Uncatched mode %s in"
372
                                   " LUTestAllocator.Exec", self.op.mode)
373

    
374
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
375
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
376
      result = ial.in_text
377
    else:
378
      ial.Run(self.op.iallocator, validate=False)
379
      result = ial.out_text
380
    return result