Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / __init__.py @ 22b7f6f8

History | View | Annotate | Download (40 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import time
32
import logging
33
import OpenSSL
34

    
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import locking
38
from ganeti import constants
39
from ganeti import compat
40
from ganeti import masterd
41
from ganeti import query
42
from ganeti import qlang
43

    
44
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
45
  Tasklet, _QueryBase
46
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
47
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
48
  _ExpandInstanceName, _ExpandItemName, \
49
  _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
50
  _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
51
  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
52
  _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
53
  _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
54
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
55
  _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
56
  _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
57
  _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
58
  _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
59
  _CheckIAllocatorOrNode, _FindFaultyInstanceDisks, _CheckNodeOnline
60
from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
61
  _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
62
  _CheckNodeNotDrained, _RemoveDisks, _ShutdownInstanceDisks, \
63
  _StartInstanceDisks, _RemoveInstance
64

    
65
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
66
  LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
67
  LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
68
  LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
69
  LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
70
  LUClusterVerifyDisks
71
from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
72
  _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
73
  LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
74
from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
75
  LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
76
  _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
77
  LUNodeRemove, LURepairNodeStorage
78
from ganeti.cmdlib.instance import LUInstanceCreate, LUInstanceRename, \
79
  LUInstanceRemove, LUInstanceMove, _InstanceQuery, LUInstanceQuery, \
80
  LUInstanceQueryData, LUInstanceRecreateDisks, LUInstanceGrowDisk, \
81
  LUInstanceReplaceDisks, LUInstanceActivateDisks, \
82
  LUInstanceDeactivateDisks, LUInstanceStartup, LUInstanceShutdown, \
83
  LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
84
  LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
85
  LUInstanceSetParams, LUInstanceChangeGroup
86
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
87
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
88
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
89
  LUNetworkDisconnect
90
from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
91

    
92

    
93
class LUOobCommand(NoHooksLU):
94
  """Logical unit for OOB handling.
95

96
  """
97
  REQ_BGL = False
98
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
99

    
100
  def ExpandNames(self):
101
    """Gather locks we need.
102

103
    """
104
    if self.op.node_names:
105
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
106
      lock_names = self.op.node_names
107
    else:
108
      lock_names = locking.ALL_SET
109

    
110
    self.needed_locks = {
111
      locking.LEVEL_NODE: lock_names,
112
      }
113

    
114
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
115

    
116
    if not self.op.node_names:
117
      # Acquire node allocation lock only if all nodes are affected
118
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
119

    
120
  def CheckPrereq(self):
121
    """Check prerequisites.
122

123
    This checks:
124
     - the node exists in the configuration
125
     - OOB is supported
126

127
    Any errors are signaled by raising errors.OpPrereqError.
128

129
    """
130
    self.nodes = []
131
    self.master_node = self.cfg.GetMasterNode()
132

    
133
    assert self.op.power_delay >= 0.0
134

    
135
    if self.op.node_names:
136
      if (self.op.command in self._SKIP_MASTER and
137
          self.master_node in self.op.node_names):
138
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
139
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
140

    
141
        if master_oob_handler:
142
          additional_text = ("run '%s %s %s' if you want to operate on the"
143
                             " master regardless") % (master_oob_handler,
144
                                                      self.op.command,
145
                                                      self.master_node)
146
        else:
147
          additional_text = "it does not support out-of-band operations"
148

    
149
        raise errors.OpPrereqError(("Operating on the master node %s is not"
150
                                    " allowed for %s; %s") %
151
                                   (self.master_node, self.op.command,
152
                                    additional_text), errors.ECODE_INVAL)
153
    else:
154
      self.op.node_names = self.cfg.GetNodeList()
155
      if self.op.command in self._SKIP_MASTER:
156
        self.op.node_names.remove(self.master_node)
157

    
158
    if self.op.command in self._SKIP_MASTER:
159
      assert self.master_node not in self.op.node_names
160

    
161
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
162
      if node is None:
163
        raise errors.OpPrereqError("Node %s not found" % node_name,
164
                                   errors.ECODE_NOENT)
165
      else:
166
        self.nodes.append(node)
167

    
168
      if (not self.op.ignore_status and
169
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
170
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
171
                                    " not marked offline") % node_name,
172
                                   errors.ECODE_STATE)
173

    
174
  def Exec(self, feedback_fn):
175
    """Execute OOB and return result if we expect any.
176

177
    """
178
    master_node = self.master_node
179
    ret = []
180

    
181
    for idx, node in enumerate(utils.NiceSort(self.nodes,
182
                                              key=lambda node: node.name)):
183
      node_entry = [(constants.RS_NORMAL, node.name)]
184
      ret.append(node_entry)
185

    
186
      oob_program = _SupportsOob(self.cfg, node)
187

    
188
      if not oob_program:
189
        node_entry.append((constants.RS_UNAVAIL, None))
190
        continue
191

    
192
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
193
                   self.op.command, oob_program, node.name)
194
      result = self.rpc.call_run_oob(master_node, oob_program,
195
                                     self.op.command, node.name,
196
                                     self.op.timeout)
197

    
198
      if result.fail_msg:
199
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
200
                        node.name, result.fail_msg)
201
        node_entry.append((constants.RS_NODATA, None))
202
      else:
203
        try:
204
          self._CheckPayload(result)
205
        except errors.OpExecError, err:
206
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
207
                          node.name, err)
208
          node_entry.append((constants.RS_NODATA, None))
209
        else:
210
          if self.op.command == constants.OOB_HEALTH:
211
            # For health we should log important events
212
            for item, status in result.payload:
213
              if status in [constants.OOB_STATUS_WARNING,
214
                            constants.OOB_STATUS_CRITICAL]:
215
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
216
                                item, node.name, status)
217

    
218
          if self.op.command == constants.OOB_POWER_ON:
219
            node.powered = True
220
          elif self.op.command == constants.OOB_POWER_OFF:
221
            node.powered = False
222
          elif self.op.command == constants.OOB_POWER_STATUS:
223
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
224
            if powered != node.powered:
225
              logging.warning(("Recorded power state (%s) of node '%s' does not"
226
                               " match actual power state (%s)"), node.powered,
227
                              node.name, powered)
228

    
229
          # For configuration changing commands we should update the node
230
          if self.op.command in (constants.OOB_POWER_ON,
231
                                 constants.OOB_POWER_OFF):
232
            self.cfg.Update(node, feedback_fn)
233

    
234
          node_entry.append((constants.RS_NORMAL, result.payload))
235

    
236
          if (self.op.command == constants.OOB_POWER_ON and
237
              idx < len(self.nodes) - 1):
238
            time.sleep(self.op.power_delay)
239

    
240
    return ret
241

    
242
  def _CheckPayload(self, result):
243
    """Checks if the payload is valid.
244

245
    @param result: RPC result
246
    @raises errors.OpExecError: If payload is not valid
247

248
    """
249
    errs = []
250
    if self.op.command == constants.OOB_HEALTH:
251
      if not isinstance(result.payload, list):
252
        errs.append("command 'health' is expected to return a list but got %s" %
253
                    type(result.payload))
254
      else:
255
        for item, status in result.payload:
256
          if status not in constants.OOB_STATUSES:
257
            errs.append("health item '%s' has invalid status '%s'" %
258
                        (item, status))
259

    
260
    if self.op.command == constants.OOB_POWER_STATUS:
261
      if not isinstance(result.payload, dict):
262
        errs.append("power-status is expected to return a dict but got %s" %
263
                    type(result.payload))
264

    
265
    if self.op.command in [
266
      constants.OOB_POWER_ON,
267
      constants.OOB_POWER_OFF,
268
      constants.OOB_POWER_CYCLE,
269
      ]:
270
      if result.payload is not None:
271
        errs.append("%s is expected to not return payload but got '%s'" %
272
                    (self.op.command, result.payload))
273

    
274
    if errs:
275
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
276
                               utils.CommaJoin(errs))
277

    
278

    
279
class _OsQuery(_QueryBase):
280
  FIELDS = query.OS_FIELDS
281

    
282
  def ExpandNames(self, lu):
283
    # Lock all nodes in shared mode
284
    # Temporary removal of locks, should be reverted later
285
    # TODO: reintroduce locks when they are lighter-weight
286
    lu.needed_locks = {}
287
    #self.share_locks[locking.LEVEL_NODE] = 1
288
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
289

    
290
    # The following variables interact with _QueryBase._GetNames
291
    if self.names:
292
      self.wanted = self.names
293
    else:
294
      self.wanted = locking.ALL_SET
295

    
296
    self.do_locking = self.use_locking
297

    
298
  def DeclareLocks(self, lu, level):
299
    pass
300

    
301
  @staticmethod
302
  def _DiagnoseByOS(rlist):
303
    """Remaps a per-node return list into an a per-os per-node dictionary
304

305
    @param rlist: a map with node names as keys and OS objects as values
306

307
    @rtype: dict
308
    @return: a dictionary with osnames as keys and as value another
309
        map, with nodes as keys and tuples of (path, status, diagnose,
310
        variants, parameters, api_versions) as values, eg::
311

312
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
313
                                     (/srv/..., False, "invalid api")],
314
                           "node2": [(/srv/..., True, "", [], [])]}
315
          }
316

317
    """
318
    all_os = {}
319
    # we build here the list of nodes that didn't fail the RPC (at RPC
320
    # level), so that nodes with a non-responding node daemon don't
321
    # make all OSes invalid
322
    good_nodes = [node_name for node_name in rlist
323
                  if not rlist[node_name].fail_msg]
324
    for node_name, nr in rlist.items():
325
      if nr.fail_msg or not nr.payload:
326
        continue
327
      for (name, path, status, diagnose, variants,
328
           params, api_versions) in nr.payload:
329
        if name not in all_os:
330
          # build a list of nodes for this os containing empty lists
331
          # for each node in node_list
332
          all_os[name] = {}
333
          for nname in good_nodes:
334
            all_os[name][nname] = []
335
        # convert params from [name, help] to (name, help)
336
        params = [tuple(v) for v in params]
337
        all_os[name][node_name].append((path, status, diagnose,
338
                                        variants, params, api_versions))
339
    return all_os
340

    
341
  def _GetQueryData(self, lu):
342
    """Computes the list of nodes and their attributes.
343

344
    """
345
    # Locking is not used
346
    assert not (compat.any(lu.glm.is_owned(level)
347
                           for level in locking.LEVELS
348
                           if level != locking.LEVEL_CLUSTER) or
349
                self.do_locking or self.use_locking)
350

    
351
    valid_nodes = [node.name
352
                   for node in lu.cfg.GetAllNodesInfo().values()
353
                   if not node.offline and node.vm_capable]
354
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
355
    cluster = lu.cfg.GetClusterInfo()
356

    
357
    data = {}
358

    
359
    for (os_name, os_data) in pol.items():
360
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
361
                          hidden=(os_name in cluster.hidden_os),
362
                          blacklisted=(os_name in cluster.blacklisted_os))
363

    
364
      variants = set()
365
      parameters = set()
366
      api_versions = set()
367

    
368
      for idx, osl in enumerate(os_data.values()):
369
        info.valid = bool(info.valid and osl and osl[0][1])
370
        if not info.valid:
371
          break
372

    
373
        (node_variants, node_params, node_api) = osl[0][3:6]
374
        if idx == 0:
375
          # First entry
376
          variants.update(node_variants)
377
          parameters.update(node_params)
378
          api_versions.update(node_api)
379
        else:
380
          # Filter out inconsistent values
381
          variants.intersection_update(node_variants)
382
          parameters.intersection_update(node_params)
383
          api_versions.intersection_update(node_api)
384

    
385
      info.variants = list(variants)
386
      info.parameters = list(parameters)
387
      info.api_versions = list(api_versions)
388

    
389
      data[os_name] = info
390

    
391
    # Prepare data in requested order
392
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
393
            if name in data]
394

    
395

    
396
class LUOsDiagnose(NoHooksLU):
397
  """Logical unit for OS diagnose/query.
398

399
  """
400
  REQ_BGL = False
401

    
402
  @staticmethod
403
  def _BuildFilter(fields, names):
404
    """Builds a filter for querying OSes.
405

406
    """
407
    name_filter = qlang.MakeSimpleFilter("name", names)
408

    
409
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
410
    # respective field is not requested
411
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
412
                     for fname in ["hidden", "blacklisted"]
413
                     if fname not in fields]
414
    if "valid" not in fields:
415
      status_filter.append([qlang.OP_TRUE, "valid"])
416

    
417
    if status_filter:
418
      status_filter.insert(0, qlang.OP_AND)
419
    else:
420
      status_filter = None
421

    
422
    if name_filter and status_filter:
423
      return [qlang.OP_AND, name_filter, status_filter]
424
    elif name_filter:
425
      return name_filter
426
    else:
427
      return status_filter
428

    
429
  def CheckArguments(self):
430
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
431
                       self.op.output_fields, False)
432

    
433
  def ExpandNames(self):
434
    self.oq.ExpandNames(self)
435

    
436
  def Exec(self, feedback_fn):
437
    return self.oq.OldStyleQuery(self)
438

    
439

    
440
class _ExtStorageQuery(_QueryBase):
441
  FIELDS = query.EXTSTORAGE_FIELDS
442

    
443
  def ExpandNames(self, lu):
444
    # Lock all nodes in shared mode
445
    # Temporary removal of locks, should be reverted later
446
    # TODO: reintroduce locks when they are lighter-weight
447
    lu.needed_locks = {}
448
    #self.share_locks[locking.LEVEL_NODE] = 1
449
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
450

    
451
    # The following variables interact with _QueryBase._GetNames
452
    if self.names:
453
      self.wanted = self.names
454
    else:
455
      self.wanted = locking.ALL_SET
456

    
457
    self.do_locking = self.use_locking
458

    
459
  def DeclareLocks(self, lu, level):
460
    pass
461

    
462
  @staticmethod
463
  def _DiagnoseByProvider(rlist):
464
    """Remaps a per-node return list into an a per-provider per-node dictionary
465

466
    @param rlist: a map with node names as keys and ExtStorage objects as values
467

468
    @rtype: dict
469
    @return: a dictionary with extstorage providers as keys and as
470
        value another map, with nodes as keys and tuples of
471
        (path, status, diagnose, parameters) as values, eg::
472

473
          {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
474
                         "node2": [(/srv/..., False, "missing file")]
475
                         "node3": [(/srv/..., True, "", [])]
476
          }
477

478
    """
479
    all_es = {}
480
    # we build here the list of nodes that didn't fail the RPC (at RPC
481
    # level), so that nodes with a non-responding node daemon don't
482
    # make all OSes invalid
483
    good_nodes = [node_name for node_name in rlist
484
                  if not rlist[node_name].fail_msg]
485
    for node_name, nr in rlist.items():
486
      if nr.fail_msg or not nr.payload:
487
        continue
488
      for (name, path, status, diagnose, params) in nr.payload:
489
        if name not in all_es:
490
          # build a list of nodes for this os containing empty lists
491
          # for each node in node_list
492
          all_es[name] = {}
493
          for nname in good_nodes:
494
            all_es[name][nname] = []
495
        # convert params from [name, help] to (name, help)
496
        params = [tuple(v) for v in params]
497
        all_es[name][node_name].append((path, status, diagnose, params))
498
    return all_es
499

    
500
  def _GetQueryData(self, lu):
501
    """Computes the list of nodes and their attributes.
502

503
    """
504
    # Locking is not used
505
    assert not (compat.any(lu.glm.is_owned(level)
506
                           for level in locking.LEVELS
507
                           if level != locking.LEVEL_CLUSTER) or
508
                self.do_locking or self.use_locking)
509

    
510
    valid_nodes = [node.name
511
                   for node in lu.cfg.GetAllNodesInfo().values()
512
                   if not node.offline and node.vm_capable]
513
    pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
514

    
515
    data = {}
516

    
517
    nodegroup_list = lu.cfg.GetNodeGroupList()
518

    
519
    for (es_name, es_data) in pol.items():
520
      # For every provider compute the nodegroup validity.
521
      # To do this we need to check the validity of each node in es_data
522
      # and then construct the corresponding nodegroup dict:
523
      #      { nodegroup1: status
524
      #        nodegroup2: status
525
      #      }
526
      ndgrp_data = {}
527
      for nodegroup in nodegroup_list:
528
        ndgrp = lu.cfg.GetNodeGroup(nodegroup)
529

    
530
        nodegroup_nodes = ndgrp.members
531
        nodegroup_name = ndgrp.name
532
        node_statuses = []
533

    
534
        for node in nodegroup_nodes:
535
          if node in valid_nodes:
536
            if es_data[node] != []:
537
              node_status = es_data[node][0][1]
538
              node_statuses.append(node_status)
539
            else:
540
              node_statuses.append(False)
541

    
542
        if False in node_statuses:
543
          ndgrp_data[nodegroup_name] = False
544
        else:
545
          ndgrp_data[nodegroup_name] = True
546

    
547
      # Compute the provider's parameters
548
      parameters = set()
549
      for idx, esl in enumerate(es_data.values()):
550
        valid = bool(esl and esl[0][1])
551
        if not valid:
552
          break
553

    
554
        node_params = esl[0][3]
555
        if idx == 0:
556
          # First entry
557
          parameters.update(node_params)
558
        else:
559
          # Filter out inconsistent values
560
          parameters.intersection_update(node_params)
561

    
562
      params = list(parameters)
563

    
564
      # Now fill all the info for this provider
565
      info = query.ExtStorageInfo(name=es_name, node_status=es_data,
566
                                  nodegroup_status=ndgrp_data,
567
                                  parameters=params)
568

    
569
      data[es_name] = info
570

    
571
    # Prepare data in requested order
572
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
573
            if name in data]
574

    
575

    
576
class LUExtStorageDiagnose(NoHooksLU):
577
  """Logical unit for ExtStorage diagnose/query.
578

579
  """
580
  REQ_BGL = False
581

    
582
  def CheckArguments(self):
583
    self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
584
                               self.op.output_fields, False)
585

    
586
  def ExpandNames(self):
587
    self.eq.ExpandNames(self)
588

    
589
  def Exec(self, feedback_fn):
590
    return self.eq.OldStyleQuery(self)
591

    
592

    
593
class LUQuery(NoHooksLU):
594
  """Query for resources/items of a certain kind.
595

596
  """
597
  # pylint: disable=W0142
598
  REQ_BGL = False
599

    
600
  def CheckArguments(self):
601
    qcls = _GetQueryImplementation(self.op.what)
602

    
603
    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
604

    
605
  def ExpandNames(self):
606
    self.impl.ExpandNames(self)
607

    
608
  def DeclareLocks(self, level):
609
    self.impl.DeclareLocks(self, level)
610

    
611
  def Exec(self, feedback_fn):
612
    return self.impl.NewStyleQuery(self)
613

    
614

    
615
class LUQueryFields(NoHooksLU):
616
  """Query for resources/items of a certain kind.
617

618
  """
619
  # pylint: disable=W0142
620
  REQ_BGL = False
621

    
622
  def CheckArguments(self):
623
    self.qcls = _GetQueryImplementation(self.op.what)
624

    
625
  def ExpandNames(self):
626
    self.needed_locks = {}
627

    
628
  def Exec(self, feedback_fn):
629
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
630

    
631

    
632
class LUBackupQuery(NoHooksLU):
633
  """Query the exports list
634

635
  """
636
  REQ_BGL = False
637

    
638
  def CheckArguments(self):
639
    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
640
                             ["node", "export"], self.op.use_locking)
641

    
642
  def ExpandNames(self):
643
    self.expq.ExpandNames(self)
644

    
645
  def DeclareLocks(self, level):
646
    self.expq.DeclareLocks(self, level)
647

    
648
  def Exec(self, feedback_fn):
649
    result = {}
650

    
651
    for (node, expname) in self.expq.OldStyleQuery(self):
652
      if expname is None:
653
        result[node] = False
654
      else:
655
        result.setdefault(node, []).append(expname)
656

    
657
    return result
658

    
659

    
660
class _ExportQuery(_QueryBase):
661
  FIELDS = query.EXPORT_FIELDS
662

    
663
  #: The node name is not a unique key for this query
664
  SORT_FIELD = "node"
665

    
666
  def ExpandNames(self, lu):
667
    lu.needed_locks = {}
668

    
669
    # The following variables interact with _QueryBase._GetNames
670
    if self.names:
671
      self.wanted = _GetWantedNodes(lu, self.names)
672
    else:
673
      self.wanted = locking.ALL_SET
674

    
675
    self.do_locking = self.use_locking
676

    
677
    if self.do_locking:
678
      lu.share_locks = _ShareAll()
679
      lu.needed_locks = {
680
        locking.LEVEL_NODE: self.wanted,
681
        }
682

    
683
      if not self.names:
684
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
685

    
686
  def DeclareLocks(self, lu, level):
687
    pass
688

    
689
  def _GetQueryData(self, lu):
690
    """Computes the list of nodes and their attributes.
691

692
    """
693
    # Locking is not used
694
    # TODO
695
    assert not (compat.any(lu.glm.is_owned(level)
696
                           for level in locking.LEVELS
697
                           if level != locking.LEVEL_CLUSTER) or
698
                self.do_locking or self.use_locking)
699

    
700
    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
701

    
702
    result = []
703

    
704
    for (node, nres) in lu.rpc.call_export_list(nodes).items():
705
      if nres.fail_msg:
706
        result.append((node, None))
707
      else:
708
        result.extend((node, expname) for expname in nres.payload)
709

    
710
    return result
711

    
712

    
713
class LUBackupPrepare(NoHooksLU):
714
  """Prepares an instance for an export and returns useful information.
715

716
  """
717
  REQ_BGL = False
718

    
719
  def ExpandNames(self):
720
    self._ExpandAndLockInstance()
721

    
722
  def CheckPrereq(self):
723
    """Check prerequisites.
724

725
    """
726
    instance_name = self.op.instance_name
727

    
728
    self.instance = self.cfg.GetInstanceInfo(instance_name)
729
    assert self.instance is not None, \
730
          "Cannot retrieve locked instance %s" % self.op.instance_name
731
    _CheckNodeOnline(self, self.instance.primary_node)
732

    
733
    self._cds = _GetClusterDomainSecret()
734

    
735
  def Exec(self, feedback_fn):
736
    """Prepares an instance for an export.
737

738
    """
739
    instance = self.instance
740

    
741
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
742
      salt = utils.GenerateSecret(8)
743

    
744
      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
745
      result = self.rpc.call_x509_cert_create(instance.primary_node,
746
                                              constants.RIE_CERT_VALIDITY)
747
      result.Raise("Can't create X509 key and certificate on %s" % result.node)
748

    
749
      (name, cert_pem) = result.payload
750

    
751
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
752
                                             cert_pem)
753

    
754
      return {
755
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
756
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
757
                          salt),
758
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
759
        }
760

    
761
    return None
762

    
763

    
764
class LUBackupExport(LogicalUnit):
765
  """Export an instance to an image in the cluster.
766

767
  """
768
  HPATH = "instance-export"
769
  HTYPE = constants.HTYPE_INSTANCE
770
  REQ_BGL = False
771

    
772
  def CheckArguments(self):
773
    """Check the arguments.
774

775
    """
776
    self.x509_key_name = self.op.x509_key_name
777
    self.dest_x509_ca_pem = self.op.destination_x509_ca
778

    
779
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
780
      if not self.x509_key_name:
781
        raise errors.OpPrereqError("Missing X509 key name for encryption",
782
                                   errors.ECODE_INVAL)
783

    
784
      if not self.dest_x509_ca_pem:
785
        raise errors.OpPrereqError("Missing destination X509 CA",
786
                                   errors.ECODE_INVAL)
787

    
788
  def ExpandNames(self):
789
    self._ExpandAndLockInstance()
790

    
791
    # Lock all nodes for local exports
792
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
793
      # FIXME: lock only instance primary and destination node
794
      #
795
      # Sad but true, for now we have do lock all nodes, as we don't know where
796
      # the previous export might be, and in this LU we search for it and
797
      # remove it from its current node. In the future we could fix this by:
798
      #  - making a tasklet to search (share-lock all), then create the
799
      #    new one, then one to remove, after
800
      #  - removing the removal operation altogether
801
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
802

    
803
      # Allocations should be stopped while this LU runs with node locks, but
804
      # it doesn't have to be exclusive
805
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
806
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
807

    
808
  def DeclareLocks(self, level):
809
    """Last minute lock declaration."""
810
    # All nodes are locked anyway, so nothing to do here.
811

    
812
  def BuildHooksEnv(self):
813
    """Build hooks env.
814

815
    This will run on the master, primary node and target node.
816

817
    """
818
    env = {
819
      "EXPORT_MODE": self.op.mode,
820
      "EXPORT_NODE": self.op.target_node,
821
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
822
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
823
      # TODO: Generic function for boolean env variables
824
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
825
      }
826

    
827
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
828

    
829
    return env
830

    
831
  def BuildHooksNodes(self):
832
    """Build hooks nodes.
833

834
    """
835
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
836

    
837
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
838
      nl.append(self.op.target_node)
839

    
840
    return (nl, nl)
841

    
842
  def CheckPrereq(self):
843
    """Check prerequisites.
844

845
    This checks that the instance and node names are valid.
846

847
    """
848
    instance_name = self.op.instance_name
849

    
850
    self.instance = self.cfg.GetInstanceInfo(instance_name)
851
    assert self.instance is not None, \
852
          "Cannot retrieve locked instance %s" % self.op.instance_name
853
    _CheckNodeOnline(self, self.instance.primary_node)
854

    
855
    if (self.op.remove_instance and
856
        self.instance.admin_state == constants.ADMINST_UP and
857
        not self.op.shutdown):
858
      raise errors.OpPrereqError("Can not remove instance without shutting it"
859
                                 " down before", errors.ECODE_STATE)
860

    
861
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
862
      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
863
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
864
      assert self.dst_node is not None
865

    
866
      _CheckNodeOnline(self, self.dst_node.name)
867
      _CheckNodeNotDrained(self, self.dst_node.name)
868

    
869
      self._cds = None
870
      self.dest_disk_info = None
871
      self.dest_x509_ca = None
872

    
873
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
874
      self.dst_node = None
875

    
876
      if len(self.op.target_node) != len(self.instance.disks):
877
        raise errors.OpPrereqError(("Received destination information for %s"
878
                                    " disks, but instance %s has %s disks") %
879
                                   (len(self.op.target_node), instance_name,
880
                                    len(self.instance.disks)),
881
                                   errors.ECODE_INVAL)
882

    
883
      cds = _GetClusterDomainSecret()
884

    
885
      # Check X509 key name
886
      try:
887
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
888
      except (TypeError, ValueError), err:
889
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
890
                                   errors.ECODE_INVAL)
891

    
892
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
893
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
894
                                   errors.ECODE_INVAL)
895

    
896
      # Load and verify CA
897
      try:
898
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
899
      except OpenSSL.crypto.Error, err:
900
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
901
                                   (err, ), errors.ECODE_INVAL)
902

    
903
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
904
      if errcode is not None:
905
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
906
                                   (msg, ), errors.ECODE_INVAL)
907

    
908
      self.dest_x509_ca = cert
909

    
910
      # Verify target information
911
      disk_info = []
912
      for idx, disk_data in enumerate(self.op.target_node):
913
        try:
914
          (host, port, magic) = \
915
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
916
        except errors.GenericError, err:
917
          raise errors.OpPrereqError("Target info for disk %s: %s" %
918
                                     (idx, err), errors.ECODE_INVAL)
919

    
920
        disk_info.append((host, port, magic))
921

    
922
      assert len(disk_info) == len(self.op.target_node)
923
      self.dest_disk_info = disk_info
924

    
925
    else:
926
      raise errors.ProgrammerError("Unhandled export mode %r" %
927
                                   self.op.mode)
928

    
929
    # instance disk type verification
930
    # TODO: Implement export support for file-based disks
931
    for disk in self.instance.disks:
932
      if disk.dev_type == constants.LD_FILE:
933
        raise errors.OpPrereqError("Export not supported for instances with"
934
                                   " file-based disks", errors.ECODE_INVAL)
935

    
936
  def _CleanupExports(self, feedback_fn):
937
    """Removes exports of current instance from all other nodes.
938

939
    If an instance in a cluster with nodes A..D was exported to node C, its
940
    exports will be removed from the nodes A, B and D.
941

942
    """
943
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
944

    
945
    nodelist = self.cfg.GetNodeList()
946
    nodelist.remove(self.dst_node.name)
947

    
948
    # on one-node clusters nodelist will be empty after the removal
949
    # if we proceed the backup would be removed because OpBackupQuery
950
    # substitutes an empty list with the full cluster node list.
951
    iname = self.instance.name
952
    if nodelist:
953
      feedback_fn("Removing old exports for instance %s" % iname)
954
      exportlist = self.rpc.call_export_list(nodelist)
955
      for node in exportlist:
956
        if exportlist[node].fail_msg:
957
          continue
958
        if iname in exportlist[node].payload:
959
          msg = self.rpc.call_export_remove(node, iname).fail_msg
960
          if msg:
961
            self.LogWarning("Could not remove older export for instance %s"
962
                            " on node %s: %s", iname, node, msg)
963

    
964
  def Exec(self, feedback_fn):
965
    """Export an instance to an image in the cluster.
966

967
    """
968
    assert self.op.mode in constants.EXPORT_MODES
969

    
970
    instance = self.instance
971
    src_node = instance.primary_node
972

    
973
    if self.op.shutdown:
974
      # shutdown the instance, but not the disks
975
      feedback_fn("Shutting down instance %s" % instance.name)
976
      result = self.rpc.call_instance_shutdown(src_node, instance,
977
                                               self.op.shutdown_timeout,
978
                                               self.op.reason)
979
      # TODO: Maybe ignore failures if ignore_remove_failures is set
980
      result.Raise("Could not shutdown instance %s on"
981
                   " node %s" % (instance.name, src_node))
982

    
983
    # set the disks ID correctly since call_instance_start needs the
984
    # correct drbd minor to create the symlinks
985
    for disk in instance.disks:
986
      self.cfg.SetDiskID(disk, src_node)
987

    
988
    activate_disks = (instance.admin_state != constants.ADMINST_UP)
989

    
990
    if activate_disks:
991
      # Activate the instance disks if we'exporting a stopped instance
992
      feedback_fn("Activating disks for %s" % instance.name)
993
      _StartInstanceDisks(self, instance, None)
994

    
995
    try:
996
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
997
                                                     instance)
998

    
999
      helper.CreateSnapshots()
1000
      try:
1001
        if (self.op.shutdown and
1002
            instance.admin_state == constants.ADMINST_UP and
1003
            not self.op.remove_instance):
1004
          assert not activate_disks
1005
          feedback_fn("Starting instance %s" % instance.name)
1006
          result = self.rpc.call_instance_start(src_node,
1007
                                                (instance, None, None), False,
1008
                                                 self.op.reason)
1009
          msg = result.fail_msg
1010
          if msg:
1011
            feedback_fn("Failed to start instance: %s" % msg)
1012
            _ShutdownInstanceDisks(self, instance)
1013
            raise errors.OpExecError("Could not start instance: %s" % msg)
1014

    
1015
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
1016
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
1017
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
1018
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
1019
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1020

    
1021
          (key_name, _, _) = self.x509_key_name
1022

    
1023
          dest_ca_pem = \
1024
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1025
                                            self.dest_x509_ca)
1026

    
1027
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
1028
                                                     key_name, dest_ca_pem,
1029
                                                     timeouts)
1030
      finally:
1031
        helper.Cleanup()
1032

    
1033
      # Check for backwards compatibility
1034
      assert len(dresults) == len(instance.disks)
1035
      assert compat.all(isinstance(i, bool) for i in dresults), \
1036
             "Not all results are boolean: %r" % dresults
1037

    
1038
    finally:
1039
      if activate_disks:
1040
        feedback_fn("Deactivating disks for %s" % instance.name)
1041
        _ShutdownInstanceDisks(self, instance)
1042

    
1043
    if not (compat.all(dresults) and fin_resu):
1044
      failures = []
1045
      if not fin_resu:
1046
        failures.append("export finalization")
1047
      if not compat.all(dresults):
1048
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
1049
                               if not dsk)
1050
        failures.append("disk export: disk(s) %s" % fdsk)
1051

    
1052
      raise errors.OpExecError("Export failed, errors in %s" %
1053
                               utils.CommaJoin(failures))
1054

    
1055
    # At this point, the export was successful, we can cleanup/finish
1056

    
1057
    # Remove instance if requested
1058
    if self.op.remove_instance:
1059
      feedback_fn("Removing instance %s" % instance.name)
1060
      _RemoveInstance(self, feedback_fn, instance,
1061
                      self.op.ignore_remove_failures)
1062

    
1063
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
1064
      self._CleanupExports(feedback_fn)
1065

    
1066
    return fin_resu, dresults
1067

    
1068

    
1069
class LUBackupRemove(NoHooksLU):
1070
  """Remove exports related to the named instance.
1071

1072
  """
1073
  REQ_BGL = False
1074

    
1075
  def ExpandNames(self):
1076
    self.needed_locks = {
1077
      # We need all nodes to be locked in order for RemoveExport to work, but
1078
      # we don't need to lock the instance itself, as nothing will happen to it
1079
      # (and we can remove exports also for a removed instance)
1080
      locking.LEVEL_NODE: locking.ALL_SET,
1081

    
1082
      # Removing backups is quick, so blocking allocations is justified
1083
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1084
      }
1085

    
1086
    # Allocations should be stopped while this LU runs with node locks, but it
1087
    # doesn't have to be exclusive
1088
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
1089

    
1090
  def Exec(self, feedback_fn):
1091
    """Remove any export.
1092

1093
    """
1094
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
1095
    # If the instance was not found we'll try with the name that was passed in.
1096
    # This will only work if it was an FQDN, though.
1097
    fqdn_warn = False
1098
    if not instance_name:
1099
      fqdn_warn = True
1100
      instance_name = self.op.instance_name
1101

    
1102
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
1103
    exportlist = self.rpc.call_export_list(locked_nodes)
1104
    found = False
1105
    for node in exportlist:
1106
      msg = exportlist[node].fail_msg
1107
      if msg:
1108
        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
1109
        continue
1110
      if instance_name in exportlist[node].payload:
1111
        found = True
1112
        result = self.rpc.call_export_remove(node, instance_name)
1113
        msg = result.fail_msg
1114
        if msg:
1115
          logging.error("Could not remove export for instance %s"
1116
                        " on node %s: %s", instance_name, node, msg)
1117

    
1118
    if fqdn_warn and not found:
1119
      feedback_fn("Export not found. If trying to remove an export belonging"
1120
                  " to a deleted instance please use its Fully Qualified"
1121
                  " Domain Name.")
1122

    
1123

    
1124
class LURestrictedCommand(NoHooksLU):
1125
  """Logical unit for executing restricted commands.
1126

1127
  """
1128
  REQ_BGL = False
1129

    
1130
  def ExpandNames(self):
1131
    if self.op.nodes:
1132
      self.op.nodes = _GetWantedNodes(self, self.op.nodes)
1133

    
1134
    self.needed_locks = {
1135
      locking.LEVEL_NODE: self.op.nodes,
1136
      }
1137
    self.share_locks = {
1138
      locking.LEVEL_NODE: not self.op.use_locking,
1139
      }
1140

    
1141
  def CheckPrereq(self):
1142
    """Check prerequisites.
1143

1144
    """
1145

    
1146
  def Exec(self, feedback_fn):
1147
    """Execute restricted command and return output.
1148

1149
    """
1150
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
1151

    
1152
    # Check if correct locks are held
1153
    assert set(self.op.nodes).issubset(owned_nodes)
1154

    
1155
    rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command)
1156

    
1157
    result = []
1158

    
1159
    for node_name in self.op.nodes:
1160
      nres = rpcres[node_name]
1161
      if nres.fail_msg:
1162
        msg = ("Command '%s' on node '%s' failed: %s" %
1163
               (self.op.command, node_name, nres.fail_msg))
1164
        result.append((False, msg))
1165
      else:
1166
        result.append((True, nres.payload))
1167

    
1168
    return result
1169

    
1170

    
1171
#: Query type implementations
1172
_QUERY_IMPL = {
1173
  constants.QR_CLUSTER: _ClusterQuery,
1174
  constants.QR_INSTANCE: _InstanceQuery,
1175
  constants.QR_NODE: _NodeQuery,
1176
  constants.QR_GROUP: _GroupQuery,
1177
  constants.QR_NETWORK: _NetworkQuery,
1178
  constants.QR_OS: _OsQuery,
1179
  constants.QR_EXTSTORAGE: _ExtStorageQuery,
1180
  constants.QR_EXPORT: _ExportQuery,
1181
  }
1182

    
1183
assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
1184

    
1185

    
1186
def _GetQueryImplementation(name):
1187
  """Returns the implemtnation for a query type.
1188

1189
  @param name: Query type, must be one of L{constants.QR_VIA_OP}
1190

1191
  """
1192
  try:
1193
    return _QUERY_IMPL[name]
1194
  except KeyError:
1195
    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
1196
                               errors.ECODE_INVAL)