Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1862d460

History | View | Annotate | Download (147.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
504
        raise errors.OpPrereqError("Please prepare the cluster VNC"
505
                                   "password file %s" %
506
                                   constants.VNC_PASSWORD_FILE)
507

    
508
    self.hostname = hostname = utils.HostInfo()
509

    
510
    if hostname.ip.startswith("127."):
511
      raise errors.OpPrereqError("This host's IP resolves to the private"
512
                                 " range (%s). Please fix DNS or /etc/hosts." %
513
                                 (hostname.ip,))
514

    
515
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516

    
517
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518
                         constants.DEFAULT_NODED_PORT):
519
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520
                                 " to %s,\nbut this ip address does not"
521
                                 " belong to this host."
522
                                 " Aborting." % hostname.ip)
523

    
524
    secondary_ip = getattr(self.op, "secondary_ip", None)
525
    if secondary_ip and not utils.IsValidIP(secondary_ip):
526
      raise errors.OpPrereqError("Invalid secondary ip given")
527
    if (secondary_ip and
528
        secondary_ip != hostname.ip and
529
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530
                           constants.DEFAULT_NODED_PORT))):
531
      raise errors.OpPrereqError("You gave %s as secondary IP,"
532
                                 " but it does not belong to this host." %
533
                                 secondary_ip)
534
    self.secondary_ip = secondary_ip
535

    
536
    # checks presence of the volume group given
537
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538

    
539
    if vgstatus:
540
      raise errors.OpPrereqError("Error: %s" % vgstatus)
541

    
542
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543
                    self.op.mac_prefix):
544
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545
                                 self.op.mac_prefix)
546

    
547
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
548
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549
                                 self.op.hypervisor_type)
550

    
551
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552
    if result.failed:
553
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554
                                 (self.op.master_netdev,
555
                                  result.output.strip()))
556

    
557
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
560
                                 " executable." % constants.NODE_INITD_SCRIPT)
561

    
562
  def Exec(self, feedback_fn):
563
    """Initialize the cluster.
564

565
    """
566
    clustername = self.clustername
567
    hostname = self.hostname
568

    
569
    # set up the simple store
570
    self.sstore = ss = ssconf.SimpleStore()
571
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576

    
577
    # set up the inter-node password and certificate
578
    _InitGanetiServerSetup(ss)
579

    
580
    # start the master ip
581
    rpc.call_node_start_master(hostname.name)
582

    
583
    # set up ssh config and /etc/hosts
584
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
585
    try:
586
      sshline = f.read()
587
    finally:
588
      f.close()
589
    sshkey = sshline.split(" ")[1]
590

    
591
    _AddHostToEtcHosts(hostname.name)
592

    
593
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594

    
595
    _InitSSHSetup(hostname.name)
596

    
597
    # init of cluster config file
598
    self.cfg = cfgw = config.ConfigWriter()
599
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600
                    sshkey, self.op.mac_prefix,
601
                    self.op.vg_name, self.op.def_bridge)
602

    
603

    
604
class LUDestroyCluster(NoHooksLU):
605
  """Logical unit for destroying the cluster.
606

607
  """
608
  _OP_REQP = []
609

    
610
  def CheckPrereq(self):
611
    """Check prerequisites.
612

613
    This checks whether the cluster is empty.
614

615
    Any errors are signalled by raising errors.OpPrereqError.
616

617
    """
618
    master = self.sstore.GetMasterNode()
619

    
620
    nodelist = self.cfg.GetNodeList()
621
    if len(nodelist) != 1 or nodelist[0] != master:
622
      raise errors.OpPrereqError("There are still %d node(s) in"
623
                                 " this cluster." % (len(nodelist) - 1))
624
    instancelist = self.cfg.GetInstanceList()
625
    if instancelist:
626
      raise errors.OpPrereqError("There are still %d instance(s) in"
627
                                 " this cluster." % len(instancelist))
628

    
629
  def Exec(self, feedback_fn):
630
    """Destroys the cluster.
631

632
    """
633
    master = self.sstore.GetMasterNode()
634
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635
    utils.CreateBackup(priv_key)
636
    utils.CreateBackup(pub_key)
637
    rpc.call_node_leave_cluster(master)
638

    
639

    
640
class LUVerifyCluster(NoHooksLU):
641
  """Verifies the cluster status.
642

643
  """
644
  _OP_REQP = []
645

    
646
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647
                  remote_version, feedback_fn):
648
    """Run multiple tests against a node.
649

650
    Test list:
651
      - compares ganeti version
652
      - checks vg existance and size > 20G
653
      - checks config file checksum
654
      - checks ssh to other nodes
655

656
    Args:
657
      node: name of the node to check
658
      file_list: required list of files
659
      local_cksum: dictionary of local files and their checksums
660

661
    """
662
    # compares ganeti version
663
    local_version = constants.PROTOCOL_VERSION
664
    if not remote_version:
665
      feedback_fn(" - ERROR: connection to %s failed" % (node))
666
      return True
667

    
668
    if local_version != remote_version:
669
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670
                      (local_version, node, remote_version))
671
      return True
672

    
673
    # checks vg existance and size > 20G
674

    
675
    bad = False
676
    if not vglist:
677
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678
                      (node,))
679
      bad = True
680
    else:
681
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682
      if vgstatus:
683
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684
        bad = True
685

    
686
    # checks config file checksum
687
    # checks ssh to any
688

    
689
    if 'filelist' not in node_result:
690
      bad = True
691
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
692
    else:
693
      remote_cksum = node_result['filelist']
694
      for file_name in file_list:
695
        if file_name not in remote_cksum:
696
          bad = True
697
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
698
        elif remote_cksum[file_name] != local_cksum[file_name]:
699
          bad = True
700
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701

    
702
    if 'nodelist' not in node_result:
703
      bad = True
704
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705
    else:
706
      if node_result['nodelist']:
707
        bad = True
708
        for node in node_result['nodelist']:
709
          feedback_fn("  - ERROR: communication with node '%s': %s" %
710
                          (node, node_result['nodelist'][node]))
711
    hyp_result = node_result.get('hypervisor', None)
712
    if hyp_result is not None:
713
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714
    return bad
715

    
716
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717
    """Verify an instance.
718

719
    This function checks to see if the required block devices are
720
    available on the instance's node.
721

722
    """
723
    bad = False
724

    
725
    instancelist = self.cfg.GetInstanceList()
726
    if not instance in instancelist:
727
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
728
                      (instance, instancelist))
729
      bad = True
730

    
731
    instanceconfig = self.cfg.GetInstanceInfo(instance)
732
    node_current = instanceconfig.primary_node
733

    
734
    node_vol_should = {}
735
    instanceconfig.MapLVsByNode(node_vol_should)
736

    
737
    for node in node_vol_should:
738
      for volume in node_vol_should[node]:
739
        if node not in node_vol_is or volume not in node_vol_is[node]:
740
          feedback_fn("  - ERROR: volume %s missing on node %s" %
741
                          (volume, node))
742
          bad = True
743

    
744
    if not instanceconfig.status == 'down':
745
      if not instance in node_instance[node_current]:
746
        feedback_fn("  - ERROR: instance %s not running on node %s" %
747
                        (instance, node_current))
748
        bad = True
749

    
750
    for node in node_instance:
751
      if (not node == node_current):
752
        if instance in node_instance[node]:
753
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
754
                          (instance, node))
755
          bad = True
756

    
757
    return bad
758

    
759
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760
    """Verify if there are any unknown volumes in the cluster.
761

762
    The .os, .swap and backup volumes are ignored. All other volumes are
763
    reported as unknown.
764

765
    """
766
    bad = False
767

    
768
    for node in node_vol_is:
769
      for volume in node_vol_is[node]:
770
        if node not in node_vol_should or volume not in node_vol_should[node]:
771
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772
                      (volume, node))
773
          bad = True
774
    return bad
775

    
776
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777
    """Verify the list of running instances.
778

779
    This checks what instances are running but unknown to the cluster.
780

781
    """
782
    bad = False
783
    for node in node_instance:
784
      for runninginstance in node_instance[node]:
785
        if runninginstance not in instancelist:
786
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787
                          (runninginstance, node))
788
          bad = True
789
    return bad
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This has no prerequisites.
795

796
    """
797
    pass
798

    
799
  def Exec(self, feedback_fn):
800
    """Verify integrity of cluster, performing various test on nodes.
801

802
    """
803
    bad = False
804
    feedback_fn("* Verifying global settings")
805
    for msg in self.cfg.VerifyConfig():
806
      feedback_fn("  - ERROR: %s" % msg)
807

    
808
    vg_name = self.cfg.GetVGName()
809
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
810
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811
    node_volume = {}
812
    node_instance = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832

    
833
    for node in nodelist:
834
      feedback_fn("* Verifying node %s" % node)
835
      result = self._VerifyNode(node, file_names, local_checksums,
836
                                all_vglist[node], all_nvinfo[node],
837
                                all_rversion[node], feedback_fn)
838
      bad = bad or result
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if type(volumeinfo) != dict:
844
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
845
        bad = True
846
        continue
847

    
848
      node_volume[node] = volumeinfo
849

    
850
      # node_instance
851
      nodeinstance = all_instanceinfo[node]
852
      if type(nodeinstance) != list:
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856

    
857
      node_instance[node] = nodeinstance
858

    
859
    node_vol_should = {}
860

    
861
    for instance in instancelist:
862
      feedback_fn("* Verifying instance %s" % instance)
863
      result =  self._VerifyInstance(instance, node_volume, node_instance,
864
                                     feedback_fn)
865
      bad = bad or result
866

    
867
      inst_config = self.cfg.GetInstanceInfo(instance)
868

    
869
      inst_config.MapLVsByNode(node_vol_should)
870

    
871
    feedback_fn("* Verifying orphan volumes")
872
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
873
                                       feedback_fn)
874
    bad = bad or result
875

    
876
    feedback_fn("* Verifying remaining instances")
877
    result = self._VerifyOrphanInstances(instancelist, node_instance,
878
                                         feedback_fn)
879
    bad = bad or result
880

    
881
    return int(bad)
882

    
883

    
884
class LUVerifyDisks(NoHooksLU):
885
  """Verifies the cluster disks status.
886

887
  """
888
  _OP_REQP = []
889

    
890
  def CheckPrereq(self):
891
    """Check prerequisites.
892

893
    This has no prerequisites.
894

895
    """
896
    pass
897

    
898
  def Exec(self, feedback_fn):
899
    """Verify integrity of cluster disks.
900

901
    """
902
    result = res_nodes, res_instances = [], []
903

    
904
    vg_name = self.cfg.GetVGName()
905
    nodes = utils.NiceSort(self.cfg.GetNodeList())
906
    instances = [self.cfg.GetInstanceInfo(name)
907
                 for name in self.cfg.GetInstanceList()]
908

    
909
    nv_dict = {}
910
    for inst in instances:
911
      inst_lvs = {}
912
      if (inst.status != "up" or
913
          inst.disk_template not in constants.DTS_NET_MIRROR):
914
        continue
915
      inst.MapLVsByNode(inst_lvs)
916
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
917
      for node, vol_list in inst_lvs.iteritems():
918
        for vol in vol_list:
919
          nv_dict[(node, vol)] = inst
920

    
921
    if not nv_dict:
922
      return result
923

    
924
    node_lvs = rpc.call_volume_list(nodes, vg_name)
925

    
926
    to_act = set()
927
    for node in nodes:
928
      # node_volume
929
      lvs = node_lvs[node]
930

    
931
      if not isinstance(lvs, dict):
932
        logger.Info("connection to node %s failed or invalid data returned" %
933
                    (node,))
934
        res_nodes.append(node)
935
        continue
936

    
937
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
938
        if not lv_online:
939
          inst = nv_dict.get((node, lv_name), None)
940
          if inst is not None and inst.name not in res_instances:
941
            res_instances.append(inst.name)
942

    
943
    return result
944

    
945

    
946
class LURenameCluster(LogicalUnit):
947
  """Rename the cluster.
948

949
  """
950
  HPATH = "cluster-rename"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = ["name"]
953

    
954
  def BuildHooksEnv(self):
955
    """Build hooks env.
956

957
    """
958
    env = {
959
      "OP_TARGET": self.op.sstore.GetClusterName(),
960
      "NEW_NAME": self.op.name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Verify that the passed name is a valid one.
967

968
    """
969
    hostname = utils.HostInfo(self.op.name)
970

    
971
    new_name = hostname.name
972
    self.ip = new_ip = hostname.ip
973
    old_name = self.sstore.GetClusterName()
974
    old_ip = self.sstore.GetMasterIP()
975
    if new_name == old_name and new_ip == old_ip:
976
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
977
                                 " cluster has changed")
978
    if new_ip != old_ip:
979
      result = utils.RunCmd(["fping", "-q", new_ip])
980
      if not result.failed:
981
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
982
                                   " reachable on the network. Aborting." %
983
                                   new_ip)
984

    
985
    self.op.name = new_name
986

    
987
  def Exec(self, feedback_fn):
988
    """Rename the cluster.
989

990
    """
991
    clustername = self.op.name
992
    ip = self.ip
993
    ss = self.sstore
994

    
995
    # shutdown the master IP
996
    master = ss.GetMasterNode()
997
    if not rpc.call_node_stop_master(master):
998
      raise errors.OpExecError("Could not disable the master role")
999

    
1000
    try:
1001
      # modify the sstore
1002
      ss.SetKey(ss.SS_MASTER_IP, ip)
1003
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1004

    
1005
      # Distribute updated ss config to all nodes
1006
      myself = self.cfg.GetNodeInfo(master)
1007
      dist_nodes = self.cfg.GetNodeList()
1008
      if myself.name in dist_nodes:
1009
        dist_nodes.remove(myself.name)
1010

    
1011
      logger.Debug("Copying updated ssconf data to all nodes")
1012
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1013
        fname = ss.KeyToFilename(keyname)
1014
        result = rpc.call_upload_file(dist_nodes, fname)
1015
        for to_node in dist_nodes:
1016
          if not result[to_node]:
1017
            logger.Error("copy of file %s to node %s failed" %
1018
                         (fname, to_node))
1019
    finally:
1020
      if not rpc.call_node_start_master(master):
1021
        logger.Error("Could not re-enable the master role on the master,"
1022
                     " please restart manually.")
1023

    
1024

    
1025
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1026
  """Sleep and poll for an instance's disk to sync.
1027

1028
  """
1029
  if not instance.disks:
1030
    return True
1031

    
1032
  if not oneshot:
1033
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1034

    
1035
  node = instance.primary_node
1036

    
1037
  for dev in instance.disks:
1038
    cfgw.SetDiskID(dev, node)
1039

    
1040
  retries = 0
1041
  while True:
1042
    max_time = 0
1043
    done = True
1044
    cumul_degraded = False
1045
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1046
    if not rstats:
1047
      proc.LogWarning("Can't get any data from node %s" % node)
1048
      retries += 1
1049
      if retries >= 10:
1050
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1051
                                 " aborting." % node)
1052
      time.sleep(6)
1053
      continue
1054
    retries = 0
1055
    for i in range(len(rstats)):
1056
      mstat = rstats[i]
1057
      if mstat is None:
1058
        proc.LogWarning("Can't compute data for node %s/%s" %
1059
                        (node, instance.disks[i].iv_name))
1060
        continue
1061
      # we ignore the ldisk parameter
1062
      perc_done, est_time, is_degraded, _ = mstat
1063
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1064
      if perc_done is not None:
1065
        done = False
1066
        if est_time is not None:
1067
          rem_time = "%d estimated seconds remaining" % est_time
1068
          max_time = est_time
1069
        else:
1070
          rem_time = "no time estimate"
1071
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1072
                     (instance.disks[i].iv_name, perc_done, rem_time))
1073
    if done or oneshot:
1074
      break
1075

    
1076
    if unlock:
1077
      utils.Unlock('cmd')
1078
    try:
1079
      time.sleep(min(60, max_time))
1080
    finally:
1081
      if unlock:
1082
        utils.Lock('cmd')
1083

    
1084
  if done:
1085
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1086
  return not cumul_degraded
1087

    
1088

    
1089
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1090
  """Check that mirrors are not degraded.
1091

1092
  The ldisk parameter, if True, will change the test from the
1093
  is_degraded attribute (which represents overall non-ok status for
1094
  the device(s)) to the ldisk (representing the local storage status).
1095

1096
  """
1097
  cfgw.SetDiskID(dev, node)
1098
  if ldisk:
1099
    idx = 6
1100
  else:
1101
    idx = 5
1102

    
1103
  result = True
1104
  if on_primary or dev.AssembleOnSecondary():
1105
    rstats = rpc.call_blockdev_find(node, dev)
1106
    if not rstats:
1107
      logger.ToStderr("Can't get any data from node %s" % node)
1108
      result = False
1109
    else:
1110
      result = result and (not rstats[idx])
1111
  if dev.children:
1112
    for child in dev.children:
1113
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1114

    
1115
  return result
1116

    
1117

    
1118
class LUDiagnoseOS(NoHooksLU):
1119
  """Logical unit for OS diagnose/query.
1120

1121
  """
1122
  _OP_REQP = []
1123

    
1124
  def CheckPrereq(self):
1125
    """Check prerequisites.
1126

1127
    This always succeeds, since this is a pure query LU.
1128

1129
    """
1130
    return
1131

    
1132
  def Exec(self, feedback_fn):
1133
    """Compute the list of OSes.
1134

1135
    """
1136
    node_list = self.cfg.GetNodeList()
1137
    node_data = rpc.call_os_diagnose(node_list)
1138
    if node_data == False:
1139
      raise errors.OpExecError("Can't gather the list of OSes")
1140
    return node_data
1141

    
1142

    
1143
class LURemoveNode(LogicalUnit):
1144
  """Logical unit for removing a node.
1145

1146
  """
1147
  HPATH = "node-remove"
1148
  HTYPE = constants.HTYPE_NODE
1149
  _OP_REQP = ["node_name"]
1150

    
1151
  def BuildHooksEnv(self):
1152
    """Build hooks env.
1153

1154
    This doesn't run on the target node in the pre phase as a failed
1155
    node would not allows itself to run.
1156

1157
    """
1158
    env = {
1159
      "OP_TARGET": self.op.node_name,
1160
      "NODE_NAME": self.op.node_name,
1161
      }
1162
    all_nodes = self.cfg.GetNodeList()
1163
    all_nodes.remove(self.op.node_name)
1164
    return env, all_nodes, all_nodes
1165

    
1166
  def CheckPrereq(self):
1167
    """Check prerequisites.
1168

1169
    This checks:
1170
     - the node exists in the configuration
1171
     - it does not have primary or secondary instances
1172
     - it's not the master
1173

1174
    Any errors are signalled by raising errors.OpPrereqError.
1175

1176
    """
1177
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1178
    if node is None:
1179
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1180

    
1181
    instance_list = self.cfg.GetInstanceList()
1182

    
1183
    masternode = self.sstore.GetMasterNode()
1184
    if node.name == masternode:
1185
      raise errors.OpPrereqError("Node is the master node,"
1186
                                 " you need to failover first.")
1187

    
1188
    for instance_name in instance_list:
1189
      instance = self.cfg.GetInstanceInfo(instance_name)
1190
      if node.name == instance.primary_node:
1191
        raise errors.OpPrereqError("Instance %s still running on the node,"
1192
                                   " please remove first." % instance_name)
1193
      if node.name in instance.secondary_nodes:
1194
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1195
                                   " please remove first." % instance_name)
1196
    self.op.node_name = node.name
1197
    self.node = node
1198

    
1199
  def Exec(self, feedback_fn):
1200
    """Removes the node from the cluster.
1201

1202
    """
1203
    node = self.node
1204
    logger.Info("stopping the node daemon and removing configs from node %s" %
1205
                node.name)
1206

    
1207
    rpc.call_node_leave_cluster(node.name)
1208

    
1209
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1210

    
1211
    logger.Info("Removing node %s from config" % node.name)
1212

    
1213
    self.cfg.RemoveNode(node.name)
1214

    
1215
    _RemoveHostFromEtcHosts(node.name)
1216

    
1217

    
1218
class LUQueryNodes(NoHooksLU):
1219
  """Logical unit for querying nodes.
1220

1221
  """
1222
  _OP_REQP = ["output_fields", "names"]
1223

    
1224
  def CheckPrereq(self):
1225
    """Check prerequisites.
1226

1227
    This checks that the fields required are valid output fields.
1228

1229
    """
1230
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1231
                                     "mtotal", "mnode", "mfree",
1232
                                     "bootid"])
1233

    
1234
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1235
                               "pinst_list", "sinst_list",
1236
                               "pip", "sip"],
1237
                       dynamic=self.dynamic_fields,
1238
                       selected=self.op.output_fields)
1239

    
1240
    self.wanted = _GetWantedNodes(self, self.op.names)
1241

    
1242
  def Exec(self, feedback_fn):
1243
    """Computes the list of nodes and their attributes.
1244

1245
    """
1246
    nodenames = self.wanted
1247
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1248

    
1249
    # begin data gathering
1250

    
1251
    if self.dynamic_fields.intersection(self.op.output_fields):
1252
      live_data = {}
1253
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1254
      for name in nodenames:
1255
        nodeinfo = node_data.get(name, None)
1256
        if nodeinfo:
1257
          live_data[name] = {
1258
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1259
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1260
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1261
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1262
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1263
            "bootid": nodeinfo['bootid'],
1264
            }
1265
        else:
1266
          live_data[name] = {}
1267
    else:
1268
      live_data = dict.fromkeys(nodenames, {})
1269

    
1270
    node_to_primary = dict([(name, set()) for name in nodenames])
1271
    node_to_secondary = dict([(name, set()) for name in nodenames])
1272

    
1273
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1274
                             "sinst_cnt", "sinst_list"))
1275
    if inst_fields & frozenset(self.op.output_fields):
1276
      instancelist = self.cfg.GetInstanceList()
1277

    
1278
      for instance_name in instancelist:
1279
        inst = self.cfg.GetInstanceInfo(instance_name)
1280
        if inst.primary_node in node_to_primary:
1281
          node_to_primary[inst.primary_node].add(inst.name)
1282
        for secnode in inst.secondary_nodes:
1283
          if secnode in node_to_secondary:
1284
            node_to_secondary[secnode].add(inst.name)
1285

    
1286
    # end data gathering
1287

    
1288
    output = []
1289
    for node in nodelist:
1290
      node_output = []
1291
      for field in self.op.output_fields:
1292
        if field == "name":
1293
          val = node.name
1294
        elif field == "pinst_list":
1295
          val = list(node_to_primary[node.name])
1296
        elif field == "sinst_list":
1297
          val = list(node_to_secondary[node.name])
1298
        elif field == "pinst_cnt":
1299
          val = len(node_to_primary[node.name])
1300
        elif field == "sinst_cnt":
1301
          val = len(node_to_secondary[node.name])
1302
        elif field == "pip":
1303
          val = node.primary_ip
1304
        elif field == "sip":
1305
          val = node.secondary_ip
1306
        elif field in self.dynamic_fields:
1307
          val = live_data[node.name].get(field, None)
1308
        else:
1309
          raise errors.ParameterError(field)
1310
        node_output.append(val)
1311
      output.append(node_output)
1312

    
1313
    return output
1314

    
1315

    
1316
class LUQueryNodeVolumes(NoHooksLU):
1317
  """Logical unit for getting volumes on node(s).
1318

1319
  """
1320
  _OP_REQP = ["nodes", "output_fields"]
1321

    
1322
  def CheckPrereq(self):
1323
    """Check prerequisites.
1324

1325
    This checks that the fields required are valid output fields.
1326

1327
    """
1328
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1329

    
1330
    _CheckOutputFields(static=["node"],
1331
                       dynamic=["phys", "vg", "name", "size", "instance"],
1332
                       selected=self.op.output_fields)
1333

    
1334

    
1335
  def Exec(self, feedback_fn):
1336
    """Computes the list of nodes and their attributes.
1337

1338
    """
1339
    nodenames = self.nodes
1340
    volumes = rpc.call_node_volumes(nodenames)
1341

    
1342
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1343
             in self.cfg.GetInstanceList()]
1344

    
1345
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1346

    
1347
    output = []
1348
    for node in nodenames:
1349
      if node not in volumes or not volumes[node]:
1350
        continue
1351

    
1352
      node_vols = volumes[node][:]
1353
      node_vols.sort(key=lambda vol: vol['dev'])
1354

    
1355
      for vol in node_vols:
1356
        node_output = []
1357
        for field in self.op.output_fields:
1358
          if field == "node":
1359
            val = node
1360
          elif field == "phys":
1361
            val = vol['dev']
1362
          elif field == "vg":
1363
            val = vol['vg']
1364
          elif field == "name":
1365
            val = vol['name']
1366
          elif field == "size":
1367
            val = int(float(vol['size']))
1368
          elif field == "instance":
1369
            for inst in ilist:
1370
              if node not in lv_by_node[inst]:
1371
                continue
1372
              if vol['name'] in lv_by_node[inst][node]:
1373
                val = inst.name
1374
                break
1375
            else:
1376
              val = '-'
1377
          else:
1378
            raise errors.ParameterError(field)
1379
          node_output.append(str(val))
1380

    
1381
        output.append(node_output)
1382

    
1383
    return output
1384

    
1385

    
1386
class LUAddNode(LogicalUnit):
1387
  """Logical unit for adding node to the cluster.
1388

1389
  """
1390
  HPATH = "node-add"
1391
  HTYPE = constants.HTYPE_NODE
1392
  _OP_REQP = ["node_name"]
1393

    
1394
  def BuildHooksEnv(self):
1395
    """Build hooks env.
1396

1397
    This will run on all nodes before, and on all nodes + the new node after.
1398

1399
    """
1400
    env = {
1401
      "OP_TARGET": self.op.node_name,
1402
      "NODE_NAME": self.op.node_name,
1403
      "NODE_PIP": self.op.primary_ip,
1404
      "NODE_SIP": self.op.secondary_ip,
1405
      }
1406
    nodes_0 = self.cfg.GetNodeList()
1407
    nodes_1 = nodes_0 + [self.op.node_name, ]
1408
    return env, nodes_0, nodes_1
1409

    
1410
  def CheckPrereq(self):
1411
    """Check prerequisites.
1412

1413
    This checks:
1414
     - the new node is not already in the config
1415
     - it is resolvable
1416
     - its parameters (single/dual homed) matches the cluster
1417

1418
    Any errors are signalled by raising errors.OpPrereqError.
1419

1420
    """
1421
    node_name = self.op.node_name
1422
    cfg = self.cfg
1423

    
1424
    dns_data = utils.HostInfo(node_name)
1425

    
1426
    node = dns_data.name
1427
    primary_ip = self.op.primary_ip = dns_data.ip
1428
    secondary_ip = getattr(self.op, "secondary_ip", None)
1429
    if secondary_ip is None:
1430
      secondary_ip = primary_ip
1431
    if not utils.IsValidIP(secondary_ip):
1432
      raise errors.OpPrereqError("Invalid secondary IP given")
1433
    self.op.secondary_ip = secondary_ip
1434
    node_list = cfg.GetNodeList()
1435
    if node in node_list:
1436
      raise errors.OpPrereqError("Node %s is already in the configuration"
1437
                                 % node)
1438

    
1439
    for existing_node_name in node_list:
1440
      existing_node = cfg.GetNodeInfo(existing_node_name)
1441
      if (existing_node.primary_ip == primary_ip or
1442
          existing_node.secondary_ip == primary_ip or
1443
          existing_node.primary_ip == secondary_ip or
1444
          existing_node.secondary_ip == secondary_ip):
1445
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1446
                                   " existing node %s" % existing_node.name)
1447

    
1448
    # check that the type of the node (single versus dual homed) is the
1449
    # same as for the master
1450
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1451
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1452
    newbie_singlehomed = secondary_ip == primary_ip
1453
    if master_singlehomed != newbie_singlehomed:
1454
      if master_singlehomed:
1455
        raise errors.OpPrereqError("The master has no private ip but the"
1456
                                   " new node has one")
1457
      else:
1458
        raise errors.OpPrereqError("The master has a private ip but the"
1459
                                   " new node doesn't have one")
1460

    
1461
    # checks reachablity
1462
    if not utils.TcpPing(utils.HostInfo().name,
1463
                         primary_ip,
1464
                         constants.DEFAULT_NODED_PORT):
1465
      raise errors.OpPrereqError("Node not reachable by ping")
1466

    
1467
    if not newbie_singlehomed:
1468
      # check reachability from my secondary ip to newbie's secondary ip
1469
      if not utils.TcpPing(myself.secondary_ip,
1470
                           secondary_ip,
1471
                           constants.DEFAULT_NODED_PORT):
1472
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1473
                                   " based ping to noded port")
1474

    
1475
    self.new_node = objects.Node(name=node,
1476
                                 primary_ip=primary_ip,
1477
                                 secondary_ip=secondary_ip)
1478

    
1479
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1480
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1481
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1482
                                   constants.VNC_PASSWORD_FILE)
1483

    
1484
  def Exec(self, feedback_fn):
1485
    """Adds the new node to the cluster.
1486

1487
    """
1488
    new_node = self.new_node
1489
    node = new_node.name
1490

    
1491
    # set up inter-node password and certificate and restarts the node daemon
1492
    gntpass = self.sstore.GetNodeDaemonPassword()
1493
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1494
      raise errors.OpExecError("ganeti password corruption detected")
1495
    f = open(constants.SSL_CERT_FILE)
1496
    try:
1497
      gntpem = f.read(8192)
1498
    finally:
1499
      f.close()
1500
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1501
    # so we use this to detect an invalid certificate; as long as the
1502
    # cert doesn't contain this, the here-document will be correctly
1503
    # parsed by the shell sequence below
1504
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1505
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1506
    if not gntpem.endswith("\n"):
1507
      raise errors.OpExecError("PEM must end with newline")
1508
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1509

    
1510
    # and then connect with ssh to set password and start ganeti-noded
1511
    # note that all the below variables are sanitized at this point,
1512
    # either by being constants or by the checks above
1513
    ss = self.sstore
1514
    mycommand = ("umask 077 && "
1515
                 "echo '%s' > '%s' && "
1516
                 "cat > '%s' << '!EOF.' && \n"
1517
                 "%s!EOF.\n%s restart" %
1518
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1519
                  constants.SSL_CERT_FILE, gntpem,
1520
                  constants.NODE_INITD_SCRIPT))
1521

    
1522
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1523
    if result.failed:
1524
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1525
                               " output: %s" %
1526
                               (node, result.fail_reason, result.output))
1527

    
1528
    # check connectivity
1529
    time.sleep(4)
1530

    
1531
    result = rpc.call_version([node])[node]
1532
    if result:
1533
      if constants.PROTOCOL_VERSION == result:
1534
        logger.Info("communication to node %s fine, sw version %s match" %
1535
                    (node, result))
1536
      else:
1537
        raise errors.OpExecError("Version mismatch master version %s,"
1538
                                 " node version %s" %
1539
                                 (constants.PROTOCOL_VERSION, result))
1540
    else:
1541
      raise errors.OpExecError("Cannot get version from the new node")
1542

    
1543
    # setup ssh on node
1544
    logger.Info("copy ssh key to node %s" % node)
1545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1546
    keyarray = []
1547
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1548
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1549
                priv_key, pub_key]
1550

    
1551
    for i in keyfiles:
1552
      f = open(i, 'r')
1553
      try:
1554
        keyarray.append(f.read())
1555
      finally:
1556
        f.close()
1557

    
1558
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1559
                               keyarray[3], keyarray[4], keyarray[5])
1560

    
1561
    if not result:
1562
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1563

    
1564
    # Add node to our /etc/hosts, and add key to known_hosts
1565
    _AddHostToEtcHosts(new_node.name)
1566

    
1567
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1568
                      self.cfg.GetHostKey())
1569

    
1570
    if new_node.secondary_ip != new_node.primary_ip:
1571
      if not rpc.call_node_tcp_ping(new_node.name,
1572
                                    constants.LOCALHOST_IP_ADDRESS,
1573
                                    new_node.secondary_ip,
1574
                                    constants.DEFAULT_NODED_PORT,
1575
                                    10, False):
1576
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1577
                                 " you gave (%s). Please fix and re-run this"
1578
                                 " command." % new_node.secondary_ip)
1579

    
1580
    success, msg = ssh.VerifyNodeHostname(node)
1581
    if not success:
1582
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1583
                               " than the one the resolver gives: %s."
1584
                               " Please fix and re-run this command." %
1585
                               (node, msg))
1586

    
1587
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1588
    # including the node just added
1589
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1590
    dist_nodes = self.cfg.GetNodeList() + [node]
1591
    if myself.name in dist_nodes:
1592
      dist_nodes.remove(myself.name)
1593

    
1594
    logger.Debug("Copying hosts and known_hosts to all nodes")
1595
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1596
      result = rpc.call_upload_file(dist_nodes, fname)
1597
      for to_node in dist_nodes:
1598
        if not result[to_node]:
1599
          logger.Error("copy of file %s to node %s failed" %
1600
                       (fname, to_node))
1601

    
1602
    to_copy = ss.GetFileList()
1603
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1604
      to_copy.append(constants.VNC_PASSWORD_FILE)
1605
    for fname in to_copy:
1606
      if not ssh.CopyFileToNode(node, fname):
1607
        logger.Error("could not copy file %s to node %s" % (fname, node))
1608

    
1609
    logger.Info("adding node %s to cluster.conf" % node)
1610
    self.cfg.AddNode(new_node)
1611

    
1612

    
1613
class LUMasterFailover(LogicalUnit):
1614
  """Failover the master node to the current node.
1615

1616
  This is a special LU in that it must run on a non-master node.
1617

1618
  """
1619
  HPATH = "master-failover"
1620
  HTYPE = constants.HTYPE_CLUSTER
1621
  REQ_MASTER = False
1622
  _OP_REQP = []
1623

    
1624
  def BuildHooksEnv(self):
1625
    """Build hooks env.
1626

1627
    This will run on the new master only in the pre phase, and on all
1628
    the nodes in the post phase.
1629

1630
    """
1631
    env = {
1632
      "OP_TARGET": self.new_master,
1633
      "NEW_MASTER": self.new_master,
1634
      "OLD_MASTER": self.old_master,
1635
      }
1636
    return env, [self.new_master], self.cfg.GetNodeList()
1637

    
1638
  def CheckPrereq(self):
1639
    """Check prerequisites.
1640

1641
    This checks that we are not already the master.
1642

1643
    """
1644
    self.new_master = utils.HostInfo().name
1645
    self.old_master = self.sstore.GetMasterNode()
1646

    
1647
    if self.old_master == self.new_master:
1648
      raise errors.OpPrereqError("This commands must be run on the node"
1649
                                 " where you want the new master to be."
1650
                                 " %s is already the master" %
1651
                                 self.old_master)
1652

    
1653
  def Exec(self, feedback_fn):
1654
    """Failover the master node.
1655

1656
    This command, when run on a non-master node, will cause the current
1657
    master to cease being master, and the non-master to become new
1658
    master.
1659

1660
    """
1661
    #TODO: do not rely on gethostname returning the FQDN
1662
    logger.Info("setting master to %s, old master: %s" %
1663
                (self.new_master, self.old_master))
1664

    
1665
    if not rpc.call_node_stop_master(self.old_master):
1666
      logger.Error("could disable the master role on the old master"
1667
                   " %s, please disable manually" % self.old_master)
1668

    
1669
    ss = self.sstore
1670
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1671
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1672
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1673
      logger.Error("could not distribute the new simple store master file"
1674
                   " to the other nodes, please check.")
1675

    
1676
    if not rpc.call_node_start_master(self.new_master):
1677
      logger.Error("could not start the master role on the new master"
1678
                   " %s, please check" % self.new_master)
1679
      feedback_fn("Error in activating the master IP on the new master,"
1680
                  " please fix manually.")
1681

    
1682

    
1683

    
1684
class LUQueryClusterInfo(NoHooksLU):
1685
  """Query cluster configuration.
1686

1687
  """
1688
  _OP_REQP = []
1689
  REQ_MASTER = False
1690

    
1691
  def CheckPrereq(self):
1692
    """No prerequsites needed for this LU.
1693

1694
    """
1695
    pass
1696

    
1697
  def Exec(self, feedback_fn):
1698
    """Return cluster config.
1699

1700
    """
1701
    result = {
1702
      "name": self.sstore.GetClusterName(),
1703
      "software_version": constants.RELEASE_VERSION,
1704
      "protocol_version": constants.PROTOCOL_VERSION,
1705
      "config_version": constants.CONFIG_VERSION,
1706
      "os_api_version": constants.OS_API_VERSION,
1707
      "export_version": constants.EXPORT_VERSION,
1708
      "master": self.sstore.GetMasterNode(),
1709
      "architecture": (platform.architecture()[0], platform.machine()),
1710
      }
1711

    
1712
    return result
1713

    
1714

    
1715
class LUClusterCopyFile(NoHooksLU):
1716
  """Copy file to cluster.
1717

1718
  """
1719
  _OP_REQP = ["nodes", "filename"]
1720

    
1721
  def CheckPrereq(self):
1722
    """Check prerequisites.
1723

1724
    It should check that the named file exists and that the given list
1725
    of nodes is valid.
1726

1727
    """
1728
    if not os.path.exists(self.op.filename):
1729
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1730

    
1731
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1732

    
1733
  def Exec(self, feedback_fn):
1734
    """Copy a file from master to some nodes.
1735

1736
    Args:
1737
      opts - class with options as members
1738
      args - list containing a single element, the file name
1739
    Opts used:
1740
      nodes - list containing the name of target nodes; if empty, all nodes
1741

1742
    """
1743
    filename = self.op.filename
1744

    
1745
    myname = utils.HostInfo().name
1746

    
1747
    for node in self.nodes:
1748
      if node == myname:
1749
        continue
1750
      if not ssh.CopyFileToNode(node, filename):
1751
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1752

    
1753

    
1754
class LUDumpClusterConfig(NoHooksLU):
1755
  """Return a text-representation of the cluster-config.
1756

1757
  """
1758
  _OP_REQP = []
1759

    
1760
  def CheckPrereq(self):
1761
    """No prerequisites.
1762

1763
    """
1764
    pass
1765

    
1766
  def Exec(self, feedback_fn):
1767
    """Dump a representation of the cluster config to the standard output.
1768

1769
    """
1770
    return self.cfg.DumpConfig()
1771

    
1772

    
1773
class LURunClusterCommand(NoHooksLU):
1774
  """Run a command on some nodes.
1775

1776
  """
1777
  _OP_REQP = ["command", "nodes"]
1778

    
1779
  def CheckPrereq(self):
1780
    """Check prerequisites.
1781

1782
    It checks that the given list of nodes is valid.
1783

1784
    """
1785
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1786

    
1787
  def Exec(self, feedback_fn):
1788
    """Run a command on some nodes.
1789

1790
    """
1791
    data = []
1792
    for node in self.nodes:
1793
      result = ssh.SSHCall(node, "root", self.op.command)
1794
      data.append((node, result.output, result.exit_code))
1795

    
1796
    return data
1797

    
1798

    
1799
class LUActivateInstanceDisks(NoHooksLU):
1800
  """Bring up an instance's disks.
1801

1802
  """
1803
  _OP_REQP = ["instance_name"]
1804

    
1805
  def CheckPrereq(self):
1806
    """Check prerequisites.
1807

1808
    This checks that the instance is in the cluster.
1809

1810
    """
1811
    instance = self.cfg.GetInstanceInfo(
1812
      self.cfg.ExpandInstanceName(self.op.instance_name))
1813
    if instance is None:
1814
      raise errors.OpPrereqError("Instance '%s' not known" %
1815
                                 self.op.instance_name)
1816
    self.instance = instance
1817

    
1818

    
1819
  def Exec(self, feedback_fn):
1820
    """Activate the disks.
1821

1822
    """
1823
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1824
    if not disks_ok:
1825
      raise errors.OpExecError("Cannot activate block devices")
1826

    
1827
    return disks_info
1828

    
1829

    
1830
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1831
  """Prepare the block devices for an instance.
1832

1833
  This sets up the block devices on all nodes.
1834

1835
  Args:
1836
    instance: a ganeti.objects.Instance object
1837
    ignore_secondaries: if true, errors on secondary nodes won't result
1838
                        in an error return from the function
1839

1840
  Returns:
1841
    false if the operation failed
1842
    list of (host, instance_visible_name, node_visible_name) if the operation
1843
         suceeded with the mapping from node devices to instance devices
1844
  """
1845
  device_info = []
1846
  disks_ok = True
1847
  for inst_disk in instance.disks:
1848
    master_result = None
1849
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1850
      cfg.SetDiskID(node_disk, node)
1851
      is_primary = node == instance.primary_node
1852
      result = rpc.call_blockdev_assemble(node, node_disk,
1853
                                          instance.name, is_primary)
1854
      if not result:
1855
        logger.Error("could not prepare block device %s on node %s"
1856
                     " (is_primary=%s)" %
1857
                     (inst_disk.iv_name, node, is_primary))
1858
        if is_primary or not ignore_secondaries:
1859
          disks_ok = False
1860
      if is_primary:
1861
        master_result = result
1862
    device_info.append((instance.primary_node, inst_disk.iv_name,
1863
                        master_result))
1864

    
1865
  # leave the disks configured for the primary node
1866
  # this is a workaround that would be fixed better by
1867
  # improving the logical/physical id handling
1868
  for disk in instance.disks:
1869
    cfg.SetDiskID(disk, instance.primary_node)
1870

    
1871
  return disks_ok, device_info
1872

    
1873

    
1874
def _StartInstanceDisks(cfg, instance, force):
1875
  """Start the disks of an instance.
1876

1877
  """
1878
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1879
                                           ignore_secondaries=force)
1880
  if not disks_ok:
1881
    _ShutdownInstanceDisks(instance, cfg)
1882
    if force is not None and not force:
1883
      logger.Error("If the message above refers to a secondary node,"
1884
                   " you can retry the operation using '--force'.")
1885
    raise errors.OpExecError("Disk consistency error")
1886

    
1887

    
1888
class LUDeactivateInstanceDisks(NoHooksLU):
1889
  """Shutdown an instance's disks.
1890

1891
  """
1892
  _OP_REQP = ["instance_name"]
1893

    
1894
  def CheckPrereq(self):
1895
    """Check prerequisites.
1896

1897
    This checks that the instance is in the cluster.
1898

1899
    """
1900
    instance = self.cfg.GetInstanceInfo(
1901
      self.cfg.ExpandInstanceName(self.op.instance_name))
1902
    if instance is None:
1903
      raise errors.OpPrereqError("Instance '%s' not known" %
1904
                                 self.op.instance_name)
1905
    self.instance = instance
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Deactivate the disks
1909

1910
    """
1911
    instance = self.instance
1912
    ins_l = rpc.call_instance_list([instance.primary_node])
1913
    ins_l = ins_l[instance.primary_node]
1914
    if not type(ins_l) is list:
1915
      raise errors.OpExecError("Can't contact node '%s'" %
1916
                               instance.primary_node)
1917

    
1918
    if self.instance.name in ins_l:
1919
      raise errors.OpExecError("Instance is running, can't shutdown"
1920
                               " block devices.")
1921

    
1922
    _ShutdownInstanceDisks(instance, self.cfg)
1923

    
1924

    
1925
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1926
  """Shutdown block devices of an instance.
1927

1928
  This does the shutdown on all nodes of the instance.
1929

1930
  If the ignore_primary is false, errors on the primary node are
1931
  ignored.
1932

1933
  """
1934
  result = True
1935
  for disk in instance.disks:
1936
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1937
      cfg.SetDiskID(top_disk, node)
1938
      if not rpc.call_blockdev_shutdown(node, top_disk):
1939
        logger.Error("could not shutdown block device %s on node %s" %
1940
                     (disk.iv_name, node))
1941
        if not ignore_primary or node != instance.primary_node:
1942
          result = False
1943
  return result
1944

    
1945

    
1946
class LUStartupInstance(LogicalUnit):
1947
  """Starts an instance.
1948

1949
  """
1950
  HPATH = "instance-start"
1951
  HTYPE = constants.HTYPE_INSTANCE
1952
  _OP_REQP = ["instance_name", "force"]
1953

    
1954
  def BuildHooksEnv(self):
1955
    """Build hooks env.
1956

1957
    This runs on master, primary and secondary nodes of the instance.
1958

1959
    """
1960
    env = {
1961
      "FORCE": self.op.force,
1962
      }
1963
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1964
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965
          list(self.instance.secondary_nodes))
1966
    return env, nl, nl
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    This checks that the instance is in the cluster.
1972

1973
    """
1974
    instance = self.cfg.GetInstanceInfo(
1975
      self.cfg.ExpandInstanceName(self.op.instance_name))
1976
    if instance is None:
1977
      raise errors.OpPrereqError("Instance '%s' not known" %
1978
                                 self.op.instance_name)
1979

    
1980
    # check bridges existance
1981
    _CheckInstanceBridgesExist(instance)
1982

    
1983
    self.instance = instance
1984
    self.op.instance_name = instance.name
1985

    
1986
  def Exec(self, feedback_fn):
1987
    """Start the instance.
1988

1989
    """
1990
    instance = self.instance
1991
    force = self.op.force
1992
    extra_args = getattr(self.op, "extra_args", "")
1993

    
1994
    node_current = instance.primary_node
1995

    
1996
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1997
    if not nodeinfo:
1998
      raise errors.OpExecError("Could not contact node %s for infos" %
1999
                               (node_current))
2000

    
2001
    freememory = nodeinfo[node_current]['memory_free']
2002
    memory = instance.memory
2003
    if memory > freememory:
2004
      raise errors.OpExecError("Not enough memory to start instance"
2005
                               " %s on node %s"
2006
                               " needed %s MiB, available %s MiB" %
2007
                               (instance.name, node_current, memory,
2008
                                freememory))
2009

    
2010
    _StartInstanceDisks(self.cfg, instance, force)
2011

    
2012
    if not rpc.call_instance_start(node_current, instance, extra_args):
2013
      _ShutdownInstanceDisks(instance, self.cfg)
2014
      raise errors.OpExecError("Could not start instance")
2015

    
2016
    self.cfg.MarkInstanceUp(instance.name)
2017

    
2018

    
2019
class LURebootInstance(LogicalUnit):
2020
  """Reboot an instance.
2021

2022
  """
2023
  HPATH = "instance-reboot"
2024
  HTYPE = constants.HTYPE_INSTANCE
2025
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2026

    
2027
  def BuildHooksEnv(self):
2028
    """Build hooks env.
2029

2030
    This runs on master, primary and secondary nodes of the instance.
2031

2032
    """
2033
    env = {
2034
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2035
      }
2036
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2037
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038
          list(self.instance.secondary_nodes))
2039
    return env, nl, nl
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks that the instance is in the cluster.
2045

2046
    """
2047
    instance = self.cfg.GetInstanceInfo(
2048
      self.cfg.ExpandInstanceName(self.op.instance_name))
2049
    if instance is None:
2050
      raise errors.OpPrereqError("Instance '%s' not known" %
2051
                                 self.op.instance_name)
2052

    
2053
    # check bridges existance
2054
    _CheckInstanceBridgesExist(instance)
2055

    
2056
    self.instance = instance
2057
    self.op.instance_name = instance.name
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Reboot the instance.
2061

2062
    """
2063
    instance = self.instance
2064
    ignore_secondaries = self.op.ignore_secondaries
2065
    reboot_type = self.op.reboot_type
2066
    extra_args = getattr(self.op, "extra_args", "")
2067

    
2068
    node_current = instance.primary_node
2069

    
2070
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2071
                           constants.INSTANCE_REBOOT_HARD,
2072
                           constants.INSTANCE_REBOOT_FULL]:
2073
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2074
                                  (constants.INSTANCE_REBOOT_SOFT,
2075
                                   constants.INSTANCE_REBOOT_HARD,
2076
                                   constants.INSTANCE_REBOOT_FULL))
2077

    
2078
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2079
                       constants.INSTANCE_REBOOT_HARD]:
2080
      if not rpc.call_instance_reboot(node_current, instance,
2081
                                      reboot_type, extra_args):
2082
        raise errors.OpExecError("Could not reboot instance")
2083
    else:
2084
      if not rpc.call_instance_shutdown(node_current, instance):
2085
        raise errors.OpExecError("could not shutdown instance for full reboot")
2086
      _ShutdownInstanceDisks(instance, self.cfg)
2087
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2088
      if not rpc.call_instance_start(node_current, instance, extra_args):
2089
        _ShutdownInstanceDisks(instance, self.cfg)
2090
        raise errors.OpExecError("Could not start instance for full reboot")
2091

    
2092
    self.cfg.MarkInstanceUp(instance.name)
2093

    
2094

    
2095
class LUShutdownInstance(LogicalUnit):
2096
  """Shutdown an instance.
2097

2098
  """
2099
  HPATH = "instance-stop"
2100
  HTYPE = constants.HTYPE_INSTANCE
2101
  _OP_REQP = ["instance_name"]
2102

    
2103
  def BuildHooksEnv(self):
2104
    """Build hooks env.
2105

2106
    This runs on master, primary and secondary nodes of the instance.
2107

2108
    """
2109
    env = _BuildInstanceHookEnvByObject(self.instance)
2110
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111
          list(self.instance.secondary_nodes))
2112
    return env, nl, nl
2113

    
2114
  def CheckPrereq(self):
2115
    """Check prerequisites.
2116

2117
    This checks that the instance is in the cluster.
2118

2119
    """
2120
    instance = self.cfg.GetInstanceInfo(
2121
      self.cfg.ExpandInstanceName(self.op.instance_name))
2122
    if instance is None:
2123
      raise errors.OpPrereqError("Instance '%s' not known" %
2124
                                 self.op.instance_name)
2125
    self.instance = instance
2126

    
2127
  def Exec(self, feedback_fn):
2128
    """Shutdown the instance.
2129

2130
    """
2131
    instance = self.instance
2132
    node_current = instance.primary_node
2133
    if not rpc.call_instance_shutdown(node_current, instance):
2134
      logger.Error("could not shutdown instance")
2135

    
2136
    self.cfg.MarkInstanceDown(instance.name)
2137
    _ShutdownInstanceDisks(instance, self.cfg)
2138

    
2139

    
2140
class LUReinstallInstance(LogicalUnit):
2141
  """Reinstall an instance.
2142

2143
  """
2144
  HPATH = "instance-reinstall"
2145
  HTYPE = constants.HTYPE_INSTANCE
2146
  _OP_REQP = ["instance_name"]
2147

    
2148
  def BuildHooksEnv(self):
2149
    """Build hooks env.
2150

2151
    This runs on master, primary and secondary nodes of the instance.
2152

2153
    """
2154
    env = _BuildInstanceHookEnvByObject(self.instance)
2155
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2156
          list(self.instance.secondary_nodes))
2157
    return env, nl, nl
2158

    
2159
  def CheckPrereq(self):
2160
    """Check prerequisites.
2161

2162
    This checks that the instance is in the cluster and is not running.
2163

2164
    """
2165
    instance = self.cfg.GetInstanceInfo(
2166
      self.cfg.ExpandInstanceName(self.op.instance_name))
2167
    if instance is None:
2168
      raise errors.OpPrereqError("Instance '%s' not known" %
2169
                                 self.op.instance_name)
2170
    if instance.disk_template == constants.DT_DISKLESS:
2171
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2172
                                 self.op.instance_name)
2173
    if instance.status != "down":
2174
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2175
                                 self.op.instance_name)
2176
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2177
    if remote_info:
2178
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2179
                                 (self.op.instance_name,
2180
                                  instance.primary_node))
2181

    
2182
    self.op.os_type = getattr(self.op, "os_type", None)
2183
    if self.op.os_type is not None:
2184
      # OS verification
2185
      pnode = self.cfg.GetNodeInfo(
2186
        self.cfg.ExpandNodeName(instance.primary_node))
2187
      if pnode is None:
2188
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2189
                                   self.op.pnode)
2190
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2191
      if not os_obj:
2192
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2193
                                   " primary node"  % self.op.os_type)
2194

    
2195
    self.instance = instance
2196

    
2197
  def Exec(self, feedback_fn):
2198
    """Reinstall the instance.
2199

2200
    """
2201
    inst = self.instance
2202

    
2203
    if self.op.os_type is not None:
2204
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2205
      inst.os = self.op.os_type
2206
      self.cfg.AddInstance(inst)
2207

    
2208
    _StartInstanceDisks(self.cfg, inst, None)
2209
    try:
2210
      feedback_fn("Running the instance OS create scripts...")
2211
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2212
        raise errors.OpExecError("Could not install OS for instance %s"
2213
                                 " on node %s" %
2214
                                 (inst.name, inst.primary_node))
2215
    finally:
2216
      _ShutdownInstanceDisks(inst, self.cfg)
2217

    
2218

    
2219
class LURenameInstance(LogicalUnit):
2220
  """Rename an instance.
2221

2222
  """
2223
  HPATH = "instance-rename"
2224
  HTYPE = constants.HTYPE_INSTANCE
2225
  _OP_REQP = ["instance_name", "new_name"]
2226

    
2227
  def BuildHooksEnv(self):
2228
    """Build hooks env.
2229

2230
    This runs on master, primary and secondary nodes of the instance.
2231

2232
    """
2233
    env = _BuildInstanceHookEnvByObject(self.instance)
2234
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2235
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236
          list(self.instance.secondary_nodes))
2237
    return env, nl, nl
2238

    
2239
  def CheckPrereq(self):
2240
    """Check prerequisites.
2241

2242
    This checks that the instance is in the cluster and is not running.
2243

2244
    """
2245
    instance = self.cfg.GetInstanceInfo(
2246
      self.cfg.ExpandInstanceName(self.op.instance_name))
2247
    if instance is None:
2248
      raise errors.OpPrereqError("Instance '%s' not known" %
2249
                                 self.op.instance_name)
2250
    if instance.status != "down":
2251
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2252
                                 self.op.instance_name)
2253
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2254
    if remote_info:
2255
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2256
                                 (self.op.instance_name,
2257
                                  instance.primary_node))
2258
    self.instance = instance
2259

    
2260
    # new name verification
2261
    name_info = utils.HostInfo(self.op.new_name)
2262

    
2263
    self.op.new_name = new_name = name_info.name
2264
    if not getattr(self.op, "ignore_ip", False):
2265
      command = ["fping", "-q", name_info.ip]
2266
      result = utils.RunCmd(command)
2267
      if not result.failed:
2268
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2269
                                   (name_info.ip, new_name))
2270

    
2271

    
2272
  def Exec(self, feedback_fn):
2273
    """Reinstall the instance.
2274

2275
    """
2276
    inst = self.instance
2277
    old_name = inst.name
2278

    
2279
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2280

    
2281
    # re-read the instance from the configuration after rename
2282
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2283

    
2284
    _StartInstanceDisks(self.cfg, inst, None)
2285
    try:
2286
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2287
                                          "sda", "sdb"):
2288
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2289
               " instance has been renamed in Ganeti)" %
2290
               (inst.name, inst.primary_node))
2291
        logger.Error(msg)
2292
    finally:
2293
      _ShutdownInstanceDisks(inst, self.cfg)
2294

    
2295

    
2296
class LURemoveInstance(LogicalUnit):
2297
  """Remove an instance.
2298

2299
  """
2300
  HPATH = "instance-remove"
2301
  HTYPE = constants.HTYPE_INSTANCE
2302
  _OP_REQP = ["instance_name"]
2303

    
2304
  def BuildHooksEnv(self):
2305
    """Build hooks env.
2306

2307
    This runs on master, primary and secondary nodes of the instance.
2308

2309
    """
2310
    env = _BuildInstanceHookEnvByObject(self.instance)
2311
    nl = [self.sstore.GetMasterNode()]
2312
    return env, nl, nl
2313

    
2314
  def CheckPrereq(self):
2315
    """Check prerequisites.
2316

2317
    This checks that the instance is in the cluster.
2318

2319
    """
2320
    instance = self.cfg.GetInstanceInfo(
2321
      self.cfg.ExpandInstanceName(self.op.instance_name))
2322
    if instance is None:
2323
      raise errors.OpPrereqError("Instance '%s' not known" %
2324
                                 self.op.instance_name)
2325
    self.instance = instance
2326

    
2327
  def Exec(self, feedback_fn):
2328
    """Remove the instance.
2329

2330
    """
2331
    instance = self.instance
2332
    logger.Info("shutting down instance %s on node %s" %
2333
                (instance.name, instance.primary_node))
2334

    
2335
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2336
      if self.op.ignore_failures:
2337
        feedback_fn("Warning: can't shutdown instance")
2338
      else:
2339
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2340
                                 (instance.name, instance.primary_node))
2341

    
2342
    logger.Info("removing block devices for instance %s" % instance.name)
2343

    
2344
    if not _RemoveDisks(instance, self.cfg):
2345
      if self.op.ignore_failures:
2346
        feedback_fn("Warning: can't remove instance's disks")
2347
      else:
2348
        raise errors.OpExecError("Can't remove instance's disks")
2349

    
2350
    logger.Info("removing instance %s out of cluster config" % instance.name)
2351

    
2352
    self.cfg.RemoveInstance(instance.name)
2353

    
2354

    
2355
class LUQueryInstances(NoHooksLU):
2356
  """Logical unit for querying instances.
2357

2358
  """
2359
  _OP_REQP = ["output_fields", "names"]
2360

    
2361
  def CheckPrereq(self):
2362
    """Check prerequisites.
2363

2364
    This checks that the fields required are valid output fields.
2365

2366
    """
2367
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2368
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2369
                               "admin_state", "admin_ram",
2370
                               "disk_template", "ip", "mac", "bridge",
2371
                               "sda_size", "sdb_size"],
2372
                       dynamic=self.dynamic_fields,
2373
                       selected=self.op.output_fields)
2374

    
2375
    self.wanted = _GetWantedInstances(self, self.op.names)
2376

    
2377
  def Exec(self, feedback_fn):
2378
    """Computes the list of nodes and their attributes.
2379

2380
    """
2381
    instance_names = self.wanted
2382
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2383
                     in instance_names]
2384

    
2385
    # begin data gathering
2386

    
2387
    nodes = frozenset([inst.primary_node for inst in instance_list])
2388

    
2389
    bad_nodes = []
2390
    if self.dynamic_fields.intersection(self.op.output_fields):
2391
      live_data = {}
2392
      node_data = rpc.call_all_instances_info(nodes)
2393
      for name in nodes:
2394
        result = node_data[name]
2395
        if result:
2396
          live_data.update(result)
2397
        elif result == False:
2398
          bad_nodes.append(name)
2399
        # else no instance is alive
2400
    else:
2401
      live_data = dict([(name, {}) for name in instance_names])
2402

    
2403
    # end data gathering
2404

    
2405
    output = []
2406
    for instance in instance_list:
2407
      iout = []
2408
      for field in self.op.output_fields:
2409
        if field == "name":
2410
          val = instance.name
2411
        elif field == "os":
2412
          val = instance.os
2413
        elif field == "pnode":
2414
          val = instance.primary_node
2415
        elif field == "snodes":
2416
          val = list(instance.secondary_nodes)
2417
        elif field == "admin_state":
2418
          val = (instance.status != "down")
2419
        elif field == "oper_state":
2420
          if instance.primary_node in bad_nodes:
2421
            val = None
2422
          else:
2423
            val = bool(live_data.get(instance.name))
2424
        elif field == "admin_ram":
2425
          val = instance.memory
2426
        elif field == "oper_ram":
2427
          if instance.primary_node in bad_nodes:
2428
            val = None
2429
          elif instance.name in live_data:
2430
            val = live_data[instance.name].get("memory", "?")
2431
          else:
2432
            val = "-"
2433
        elif field == "disk_template":
2434
          val = instance.disk_template
2435
        elif field == "ip":
2436
          val = instance.nics[0].ip
2437
        elif field == "bridge":
2438
          val = instance.nics[0].bridge
2439
        elif field == "mac":
2440
          val = instance.nics[0].mac
2441
        elif field == "sda_size" or field == "sdb_size":
2442
          disk = instance.FindDisk(field[:3])
2443
          if disk is None:
2444
            val = None
2445
          else:
2446
            val = disk.size
2447
        else:
2448
          raise errors.ParameterError(field)
2449
        iout.append(val)
2450
      output.append(iout)
2451

    
2452
    return output
2453

    
2454

    
2455
class LUFailoverInstance(LogicalUnit):
2456
  """Failover an instance.
2457

2458
  """
2459
  HPATH = "instance-failover"
2460
  HTYPE = constants.HTYPE_INSTANCE
2461
  _OP_REQP = ["instance_name", "ignore_consistency"]
2462

    
2463
  def BuildHooksEnv(self):
2464
    """Build hooks env.
2465

2466
    This runs on master, primary and secondary nodes of the instance.
2467

2468
    """
2469
    env = {
2470
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2471
      }
2472
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2473
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2474
    return env, nl, nl
2475

    
2476
  def CheckPrereq(self):
2477
    """Check prerequisites.
2478

2479
    This checks that the instance is in the cluster.
2480

2481
    """
2482
    instance = self.cfg.GetInstanceInfo(
2483
      self.cfg.ExpandInstanceName(self.op.instance_name))
2484
    if instance is None:
2485
      raise errors.OpPrereqError("Instance '%s' not known" %
2486
                                 self.op.instance_name)
2487

    
2488
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2489
      raise errors.OpPrereqError("Instance's disk layout is not"
2490
                                 " network mirrored, cannot failover.")
2491

    
2492
    secondary_nodes = instance.secondary_nodes
2493
    if not secondary_nodes:
2494
      raise errors.ProgrammerError("no secondary node but using "
2495
                                   "DT_REMOTE_RAID1 template")
2496

    
2497
    # check memory requirements on the secondary node
2498
    target_node = secondary_nodes[0]
2499
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2500
    info = nodeinfo.get(target_node, None)
2501
    if not info:
2502
      raise errors.OpPrereqError("Cannot get current information"
2503
                                 " from node '%s'" % nodeinfo)
2504
    if instance.memory > info['memory_free']:
2505
      raise errors.OpPrereqError("Not enough memory on target node %s."
2506
                                 " %d MB available, %d MB required" %
2507
                                 (target_node, info['memory_free'],
2508
                                  instance.memory))
2509

    
2510
    # check bridge existance
2511
    brlist = [nic.bridge for nic in instance.nics]
2512
    if not rpc.call_bridges_exist(target_node, brlist):
2513
      raise errors.OpPrereqError("One or more target bridges %s does not"
2514
                                 " exist on destination node '%s'" %
2515
                                 (brlist, target_node))
2516

    
2517
    self.instance = instance
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Failover an instance.
2521

2522
    The failover is done by shutting it down on its present node and
2523
    starting it on the secondary.
2524

2525
    """
2526
    instance = self.instance
2527

    
2528
    source_node = instance.primary_node
2529
    target_node = instance.secondary_nodes[0]
2530

    
2531
    feedback_fn("* checking disk consistency between source and target")
2532
    for dev in instance.disks:
2533
      # for remote_raid1, these are md over drbd
2534
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2535
        if not self.op.ignore_consistency:
2536
          raise errors.OpExecError("Disk %s is degraded on target node,"
2537
                                   " aborting failover." % dev.iv_name)
2538

    
2539
    feedback_fn("* checking target node resource availability")
2540
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2541

    
2542
    if not nodeinfo:
2543
      raise errors.OpExecError("Could not contact target node %s." %
2544
                               target_node)
2545

    
2546
    free_memory = int(nodeinfo[target_node]['memory_free'])
2547
    memory = instance.memory
2548
    if memory > free_memory:
2549
      raise errors.OpExecError("Not enough memory to create instance %s on"
2550
                               " node %s. needed %s MiB, available %s MiB" %
2551
                               (instance.name, target_node, memory,
2552
                                free_memory))
2553

    
2554
    feedback_fn("* shutting down instance on source node")
2555
    logger.Info("Shutting down instance %s on node %s" %
2556
                (instance.name, source_node))
2557

    
2558
    if not rpc.call_instance_shutdown(source_node, instance):
2559
      if self.op.ignore_consistency:
2560
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2561
                     " anyway. Please make sure node %s is down"  %
2562
                     (instance.name, source_node, source_node))
2563
      else:
2564
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2565
                                 (instance.name, source_node))
2566

    
2567
    feedback_fn("* deactivating the instance's disks on source node")
2568
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2569
      raise errors.OpExecError("Can't shut down the instance's disks.")
2570

    
2571
    instance.primary_node = target_node
2572
    # distribute new instance config to the other nodes
2573
    self.cfg.AddInstance(instance)
2574

    
2575
    feedback_fn("* activating the instance's disks on target node")
2576
    logger.Info("Starting instance %s on node %s" %
2577
                (instance.name, target_node))
2578

    
2579
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2580
                                             ignore_secondaries=True)
2581
    if not disks_ok:
2582
      _ShutdownInstanceDisks(instance, self.cfg)
2583
      raise errors.OpExecError("Can't activate the instance's disks")
2584

    
2585
    feedback_fn("* starting the instance on the target node")
2586
    if not rpc.call_instance_start(target_node, instance, None):
2587
      _ShutdownInstanceDisks(instance, self.cfg)
2588
      raise errors.OpExecError("Could not start instance %s on node %s." %
2589
                               (instance.name, target_node))
2590

    
2591

    
2592
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2593
  """Create a tree of block devices on the primary node.
2594

2595
  This always creates all devices.
2596

2597
  """
2598
  if device.children:
2599
    for child in device.children:
2600
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2601
        return False
2602

    
2603
  cfg.SetDiskID(device, node)
2604
  new_id = rpc.call_blockdev_create(node, device, device.size,
2605
                                    instance.name, True, info)
2606
  if not new_id:
2607
    return False
2608
  if device.physical_id is None:
2609
    device.physical_id = new_id
2610
  return True
2611

    
2612

    
2613
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2614
  """Create a tree of block devices on a secondary node.
2615

2616
  If this device type has to be created on secondaries, create it and
2617
  all its children.
2618

2619
  If not, just recurse to children keeping the same 'force' value.
2620

2621
  """
2622
  if device.CreateOnSecondary():
2623
    force = True
2624
  if device.children:
2625
    for child in device.children:
2626
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2627
                                        child, force, info):
2628
        return False
2629

    
2630
  if not force:
2631
    return True
2632
  cfg.SetDiskID(device, node)
2633
  new_id = rpc.call_blockdev_create(node, device, device.size,
2634
                                    instance.name, False, info)
2635
  if not new_id:
2636
    return False
2637
  if device.physical_id is None:
2638
    device.physical_id = new_id
2639
  return True
2640

    
2641

    
2642
def _GenerateUniqueNames(cfg, exts):
2643
  """Generate a suitable LV name.
2644

2645
  This will generate a logical volume name for the given instance.
2646

2647
  """
2648
  results = []
2649
  for val in exts:
2650
    new_id = cfg.GenerateUniqueID()
2651
    results.append("%s%s" % (new_id, val))
2652
  return results
2653

    
2654

    
2655
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2656
  """Generate a drbd device complete with its children.
2657

2658
  """
2659
  port = cfg.AllocatePort()
2660
  vgname = cfg.GetVGName()
2661
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662
                          logical_id=(vgname, names[0]))
2663
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664
                          logical_id=(vgname, names[1]))
2665
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2666
                          logical_id = (primary, secondary, port),
2667
                          children = [dev_data, dev_meta])
2668
  return drbd_dev
2669

    
2670

    
2671
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2672
  """Generate a drbd8 device complete with its children.
2673

2674
  """
2675
  port = cfg.AllocatePort()
2676
  vgname = cfg.GetVGName()
2677
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2678
                          logical_id=(vgname, names[0]))
2679
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2680
                          logical_id=(vgname, names[1]))
2681
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2682
                          logical_id = (primary, secondary, port),
2683
                          children = [dev_data, dev_meta],
2684
                          iv_name=iv_name)
2685
  return drbd_dev
2686

    
2687
def _GenerateDiskTemplate(cfg, template_name,
2688
                          instance_name, primary_node,
2689
                          secondary_nodes, disk_sz, swap_sz):
2690
  """Generate the entire disk layout for a given template type.
2691

2692
  """
2693
  #TODO: compute space requirements
2694

    
2695
  vgname = cfg.GetVGName()
2696
  if template_name == "diskless":
2697
    disks = []
2698
  elif template_name == "plain":
2699
    if len(secondary_nodes) != 0:
2700
      raise errors.ProgrammerError("Wrong template configuration")
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704
                           logical_id=(vgname, names[0]),
2705
                           iv_name = "sda")
2706
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707
                           logical_id=(vgname, names[1]),
2708
                           iv_name = "sdb")
2709
    disks = [sda_dev, sdb_dev]
2710
  elif template_name == "local_raid1":
2711
    if len(secondary_nodes) != 0:
2712
      raise errors.ProgrammerError("Wrong template configuration")
2713

    
2714

    
2715
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2716
                                       ".sdb_m1", ".sdb_m2"])
2717
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718
                              logical_id=(vgname, names[0]))
2719
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2720
                              logical_id=(vgname, names[1]))
2721
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2722
                              size=disk_sz,
2723
                              children = [sda_dev_m1, sda_dev_m2])
2724
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2725
                              logical_id=(vgname, names[2]))
2726
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2727
                              logical_id=(vgname, names[3]))
2728
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2729
                              size=swap_sz,
2730
                              children = [sdb_dev_m1, sdb_dev_m2])
2731
    disks = [md_sda_dev, md_sdb_dev]
2732
  elif template_name == constants.DT_REMOTE_RAID1:
2733
    if len(secondary_nodes) != 1:
2734
      raise errors.ProgrammerError("Wrong template configuration")
2735
    remote_node = secondary_nodes[0]
2736
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2737
                                       ".sdb_data", ".sdb_meta"])
2738
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2739
                                         disk_sz, names[0:2])
2740
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2741
                              children = [drbd_sda_dev], size=disk_sz)
2742
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2743
                                         swap_sz, names[2:4])
2744
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2745
                              children = [drbd_sdb_dev], size=swap_sz)
2746
    disks = [md_sda_dev, md_sdb_dev]
2747
  elif template_name == constants.DT_DRBD8:
2748
    if len(secondary_nodes) != 1:
2749
      raise errors.ProgrammerError("Wrong template configuration")
2750
    remote_node = secondary_nodes[0]
2751
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752
                                       ".sdb_data", ".sdb_meta"])
2753
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2754
                                         disk_sz, names[0:2], "sda")
2755
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2756
                                         swap_sz, names[2:4], "sdb")
2757
    disks = [drbd_sda_dev, drbd_sdb_dev]
2758
  else:
2759
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2760
  return disks
2761

    
2762

    
2763
def _GetInstanceInfoText(instance):
2764
  """Compute that text that should be added to the disk's metadata.
2765

2766
  """
2767
  return "originstname+%s" % instance.name
2768

    
2769

    
2770
def _CreateDisks(cfg, instance):
2771
  """Create all disks for an instance.
2772

2773
  This abstracts away some work from AddInstance.
2774

2775
  Args:
2776
    instance: the instance object
2777

2778
  Returns:
2779
    True or False showing the success of the creation process
2780

2781
  """
2782
  info = _GetInstanceInfoText(instance)
2783

    
2784
  for device in instance.disks:
2785
    logger.Info("creating volume %s for instance %s" %
2786
              (device.iv_name, instance.name))
2787
    #HARDCODE
2788
    for secondary_node in instance.secondary_nodes:
2789
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2790
                                        device, False, info):
2791
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2792
                     (device.iv_name, device, secondary_node))
2793
        return False
2794
    #HARDCODE
2795
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2796
                                    instance, device, info):
2797
      logger.Error("failed to create volume %s on primary!" %
2798
                   device.iv_name)
2799
      return False
2800
  return True
2801

    
2802

    
2803
def _RemoveDisks(instance, cfg):
2804
  """Remove all disks for an instance.
2805

2806
  This abstracts away some work from `AddInstance()` and
2807
  `RemoveInstance()`. Note that in case some of the devices couldn't
2808
  be removed, the removal will continue with the other ones (compare
2809
  with `_CreateDisks()`).
2810

2811
  Args:
2812
    instance: the instance object
2813

2814
  Returns:
2815
    True or False showing the success of the removal proces
2816

2817
  """
2818
  logger.Info("removing block devices for instance %s" % instance.name)
2819

    
2820
  result = True
2821
  for device in instance.disks:
2822
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2823
      cfg.SetDiskID(disk, node)
2824
      if not rpc.call_blockdev_remove(node, disk):
2825
        logger.Error("could not remove block device %s on node %s,"
2826
                     " continuing anyway" %
2827
                     (device.iv_name, node))
2828
        result = False
2829
  return result
2830

    
2831

    
2832
class LUCreateInstance(LogicalUnit):
2833
  """Create an instance.
2834

2835
  """
2836
  HPATH = "instance-add"
2837
  HTYPE = constants.HTYPE_INSTANCE
2838
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2839
              "disk_template", "swap_size", "mode", "start", "vcpus",
2840
              "wait_for_sync", "ip_check", "mac"]
2841

    
2842
  def BuildHooksEnv(self):
2843
    """Build hooks env.
2844

2845
    This runs on master, primary and secondary nodes of the instance.
2846

2847
    """
2848
    env = {
2849
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2850
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2851
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2852
      "INSTANCE_ADD_MODE": self.op.mode,
2853
      }
2854
    if self.op.mode == constants.INSTANCE_IMPORT:
2855
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2856
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2857
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2858

    
2859
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2860
      primary_node=self.op.pnode,
2861
      secondary_nodes=self.secondaries,
2862
      status=self.instance_status,
2863
      os_type=self.op.os_type,
2864
      memory=self.op.mem_size,
2865
      vcpus=self.op.vcpus,
2866
      nics=[(self.inst_ip, self.op.bridge)],
2867
    ))
2868

    
2869
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2870
          self.secondaries)
2871
    return env, nl, nl
2872

    
2873

    
2874
  def CheckPrereq(self):
2875
    """Check prerequisites.
2876

2877
    """
2878
    if self.op.mode not in (constants.INSTANCE_CREATE,
2879
                            constants.INSTANCE_IMPORT):
2880
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2881
                                 self.op.mode)
2882

    
2883
    if self.op.mode == constants.INSTANCE_IMPORT:
2884
      src_node = getattr(self.op, "src_node", None)
2885
      src_path = getattr(self.op, "src_path", None)
2886
      if src_node is None or src_path is None:
2887
        raise errors.OpPrereqError("Importing an instance requires source"
2888
                                   " node and path options")
2889
      src_node_full = self.cfg.ExpandNodeName(src_node)
2890
      if src_node_full is None:
2891
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2892
      self.op.src_node = src_node = src_node_full
2893

    
2894
      if not os.path.isabs(src_path):
2895
        raise errors.OpPrereqError("The source path must be absolute")
2896

    
2897
      export_info = rpc.call_export_info(src_node, src_path)
2898

    
2899
      if not export_info:
2900
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2901

    
2902
      if not export_info.has_section(constants.INISECT_EXP):
2903
        raise errors.ProgrammerError("Corrupted export config")
2904

    
2905
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2906
      if (int(ei_version) != constants.EXPORT_VERSION):
2907
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2908
                                   (ei_version, constants.EXPORT_VERSION))
2909

    
2910
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2911
        raise errors.OpPrereqError("Can't import instance with more than"
2912
                                   " one data disk")
2913

    
2914
      # FIXME: are the old os-es, disk sizes, etc. useful?
2915
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2916
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2917
                                                         'disk0_dump'))
2918
      self.src_image = diskimage
2919
    else: # INSTANCE_CREATE
2920
      if getattr(self.op, "os_type", None) is None:
2921
        raise errors.OpPrereqError("No guest OS specified")
2922

    
2923
    # check primary node
2924
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2925
    if pnode is None:
2926
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2927
                                 self.op.pnode)
2928
    self.op.pnode = pnode.name
2929
    self.pnode = pnode
2930
    self.secondaries = []
2931
    # disk template and mirror node verification
2932
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2933
      raise errors.OpPrereqError("Invalid disk template name")
2934

    
2935
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2936
      if getattr(self.op, "snode", None) is None:
2937
        raise errors.OpPrereqError("The networked disk templates need"
2938
                                   " a mirror node")
2939

    
2940
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2941
      if snode_name is None:
2942
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2943
                                   self.op.snode)
2944
      elif snode_name == pnode.name:
2945
        raise errors.OpPrereqError("The secondary node cannot be"
2946
                                   " the primary node.")
2947
      self.secondaries.append(snode_name)
2948

    
2949
    # Check lv size requirements
2950
    nodenames = [pnode.name] + self.secondaries
2951
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2952

    
2953
    # Required free disk space as a function of disk and swap space
2954
    req_size_dict = {
2955
      constants.DT_DISKLESS: 0,
2956
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2957
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2958
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2959
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2960
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2961
    }
2962

    
2963
    if self.op.disk_template not in req_size_dict:
2964
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2965
                                   " is unknown" %  self.op.disk_template)
2966

    
2967
    req_size = req_size_dict[self.op.disk_template]
2968

    
2969
    for node in nodenames:
2970
      info = nodeinfo.get(node, None)
2971
      if not info:
2972
        raise errors.OpPrereqError("Cannot get current information"
2973
                                   " from node '%s'" % nodeinfo)
2974
      if req_size > info['vg_free']:
2975
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2976
                                   " %d MB available, %d MB required" %
2977
                                   (node, info['vg_free'], req_size))
2978

    
2979
    # os verification
2980
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2981
    if not os_obj:
2982
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2983
                                 " primary node"  % self.op.os_type)
2984

    
2985
    # instance verification
2986
    hostname1 = utils.HostInfo(self.op.instance_name)
2987

    
2988
    self.op.instance_name = instance_name = hostname1.name
2989
    instance_list = self.cfg.GetInstanceList()
2990
    if instance_name in instance_list:
2991
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2992
                                 instance_name)
2993

    
2994
    ip = getattr(self.op, "ip", None)
2995
    if ip is None or ip.lower() == "none":
2996
      inst_ip = None
2997
    elif ip.lower() == "auto":
2998
      inst_ip = hostname1.ip
2999
    else:
3000
      if not utils.IsValidIP(ip):
3001
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3002
                                   " like a valid IP" % ip)
3003
      inst_ip = ip
3004
    self.inst_ip = inst_ip
3005

    
3006
    if self.op.start and not self.op.ip_check:
3007
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3008
                                 " adding an instance in start mode")
3009

    
3010
    if self.op.ip_check:
3011
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3012
                       constants.DEFAULT_NODED_PORT):
3013
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3014
                                   (hostname1.ip, instance_name))
3015

    
3016
    # MAC address verification
3017
    if self.op.mac != "auto":
3018
      if not utils.IsValidMac(self.op.mac.lower()):
3019
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3020
                                   self.op.mac)
3021

    
3022
    # bridge verification
3023
    bridge = getattr(self.op, "bridge", None)
3024
    if bridge is None:
3025
      self.op.bridge = self.cfg.GetDefBridge()
3026
    else:
3027
      self.op.bridge = bridge
3028

    
3029
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3030
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3031
                                 " destination node '%s'" %
3032
                                 (self.op.bridge, pnode.name))
3033

    
3034
    if self.op.start:
3035
      self.instance_status = 'up'
3036
    else:
3037
      self.instance_status = 'down'
3038

    
3039
  def Exec(self, feedback_fn):
3040
    """Create and add the instance to the cluster.
3041

3042
    """
3043
    instance = self.op.instance_name
3044
    pnode_name = self.pnode.name
3045

    
3046
    if self.op.mac == "auto":
3047
      mac_address=self.cfg.GenerateMAC()
3048
    else:
3049
      mac_address=self.op.mac
3050

    
3051
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3052
    if self.inst_ip is not None:
3053
      nic.ip = self.inst_ip
3054

    
3055
    ht_kind = self.sstore.GetHypervisorType()
3056
    if ht_kind in constants.HTS_REQ_PORT:
3057
      network_port = self.cfg.AllocatePort()
3058
    else:
3059
      network_port = None
3060

    
3061
    disks = _GenerateDiskTemplate(self.cfg,
3062
                                  self.op.disk_template,
3063
                                  instance, pnode_name,
3064
                                  self.secondaries, self.op.disk_size,
3065
                                  self.op.swap_size)
3066

    
3067
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3068
                            primary_node=pnode_name,
3069
                            memory=self.op.mem_size,
3070
                            vcpus=self.op.vcpus,
3071
                            nics=[nic], disks=disks,
3072
                            disk_template=self.op.disk_template,
3073
                            status=self.instance_status,
3074
                            network_port=network_port,
3075
                            )
3076

    
3077
    feedback_fn("* creating instance disks...")
3078
    if not _CreateDisks(self.cfg, iobj):
3079
      _RemoveDisks(iobj, self.cfg)
3080
      raise errors.OpExecError("Device creation failed, reverting...")
3081

    
3082
    feedback_fn("adding instance %s to cluster config" % instance)
3083

    
3084
    self.cfg.AddInstance(iobj)
3085

    
3086
    if self.op.wait_for_sync:
3087
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3088
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3089
      # make sure the disks are not degraded (still sync-ing is ok)
3090
      time.sleep(15)
3091
      feedback_fn("* checking mirrors status")
3092
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3093
    else:
3094
      disk_abort = False
3095

    
3096
    if disk_abort:
3097
      _RemoveDisks(iobj, self.cfg)
3098
      self.cfg.RemoveInstance(iobj.name)
3099
      raise errors.OpExecError("There are some degraded disks for"
3100
                               " this instance")
3101

    
3102
    feedback_fn("creating os for instance %s on node %s" %
3103
                (instance, pnode_name))
3104

    
3105
    if iobj.disk_template != constants.DT_DISKLESS:
3106
      if self.op.mode == constants.INSTANCE_CREATE:
3107
        feedback_fn("* running the instance OS create scripts...")
3108
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3109
          raise errors.OpExecError("could not add os for instance %s"
3110
                                   " on node %s" %
3111
                                   (instance, pnode_name))
3112

    
3113
      elif self.op.mode == constants.INSTANCE_IMPORT:
3114
        feedback_fn("* running the instance OS import scripts...")
3115
        src_node = self.op.src_node
3116
        src_image = self.src_image
3117
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3118
                                                src_node, src_image):
3119
          raise errors.OpExecError("Could not import os for instance"
3120
                                   " %s on node %s" %
3121
                                   (instance, pnode_name))
3122
      else:
3123
        # also checked in the prereq part
3124
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3125
                                     % self.op.mode)
3126

    
3127
    if self.op.start:
3128
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3129
      feedback_fn("* starting instance...")
3130
      if not rpc.call_instance_start(pnode_name, iobj, None):
3131
        raise errors.OpExecError("Could not start instance")
3132

    
3133

    
3134
class LUConnectConsole(NoHooksLU):
3135
  """Connect to an instance's console.
3136

3137
  This is somewhat special in that it returns the command line that
3138
  you need to run on the master node in order to connect to the
3139
  console.
3140

3141
  """
3142
  _OP_REQP = ["instance_name"]
3143

    
3144
  def CheckPrereq(self):
3145
    """Check prerequisites.
3146

3147
    This checks that the instance is in the cluster.
3148

3149
    """
3150
    instance = self.cfg.GetInstanceInfo(
3151
      self.cfg.ExpandInstanceName(self.op.instance_name))
3152
    if instance is None:
3153
      raise errors.OpPrereqError("Instance '%s' not known" %
3154
                                 self.op.instance_name)
3155
    self.instance = instance
3156

    
3157
  def Exec(self, feedback_fn):
3158
    """Connect to the console of an instance
3159

3160
    """
3161
    instance = self.instance
3162
    node = instance.primary_node
3163

    
3164
    node_insts = rpc.call_instance_list([node])[node]
3165
    if node_insts is False:
3166
      raise errors.OpExecError("Can't connect to node %s." % node)
3167

    
3168
    if instance.name not in node_insts:
3169
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3170

    
3171
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3172

    
3173
    hyper = hypervisor.GetHypervisor()
3174
    console_cmd = hyper.GetShellCommandForConsole(instance)
3175
    # build ssh cmdline
3176
    argv = ["ssh", "-q", "-t"]
3177
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3178
    argv.extend(ssh.BATCH_MODE_OPTS)
3179
    argv.append(node)
3180
    argv.append(console_cmd)
3181
    return "ssh", argv
3182

    
3183

    
3184
class LUAddMDDRBDComponent(LogicalUnit):
3185
  """Adda new mirror member to an instance's disk.
3186

3187
  """
3188
  HPATH = "mirror-add"
3189
  HTYPE = constants.HTYPE_INSTANCE
3190
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3191

    
3192
  def BuildHooksEnv(self):
3193
    """Build hooks env.
3194

3195
    This runs on the master, the primary and all the secondaries.
3196

3197
    """
3198
    env = {
3199
      "NEW_SECONDARY": self.op.remote_node,
3200
      "DISK_NAME": self.op.disk_name,
3201
      }
3202
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3203
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3204
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3205
    return env, nl, nl
3206

    
3207
  def CheckPrereq(self):
3208
    """Check prerequisites.
3209

3210
    This checks that the instance is in the cluster.
3211

3212
    """
3213
    instance = self.cfg.GetInstanceInfo(
3214
      self.cfg.ExpandInstanceName(self.op.instance_name))
3215
    if instance is None:
3216
      raise errors.OpPrereqError("Instance '%s' not known" %
3217
                                 self.op.instance_name)
3218
    self.instance = instance
3219

    
3220
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3221
    if remote_node is None:
3222
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3223
    self.remote_node = remote_node
3224

    
3225
    if remote_node == instance.primary_node:
3226
      raise errors.OpPrereqError("The specified node is the primary node of"
3227
                                 " the instance.")
3228

    
3229
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3230
      raise errors.OpPrereqError("Instance's disk layout is not"
3231
                                 " remote_raid1.")
3232
    for disk in instance.disks:
3233
      if disk.iv_name == self.op.disk_name:
3234
        break
3235
    else:
3236
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3237
                                 " instance." % self.op.disk_name)
3238
    if len(disk.children) > 1:
3239
      raise errors.OpPrereqError("The device already has two slave devices."
3240
                                 " This would create a 3-disk raid1 which we"
3241
                                 " don't allow.")
3242
    self.disk = disk
3243

    
3244
  def Exec(self, feedback_fn):
3245
    """Add the mirror component
3246

3247
    """
3248
    disk = self.disk
3249
    instance = self.instance
3250

    
3251
    remote_node = self.remote_node
3252
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3253
    names = _GenerateUniqueNames(self.cfg, lv_names)
3254
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3255
                                     remote_node, disk.size, names)
3256

    
3257
    logger.Info("adding new mirror component on secondary")
3258
    #HARDCODE
3259
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3260
                                      new_drbd, False,
3261
                                      _GetInstanceInfoText(instance)):
3262
      raise errors.OpExecError("Failed to create new component on secondary"
3263
                               " node %s" % remote_node)
3264

    
3265
    logger.Info("adding new mirror component on primary")
3266
    #HARDCODE
3267
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3268
                                    instance, new_drbd,
3269
                                    _GetInstanceInfoText(instance)):
3270
      # remove secondary dev
3271
      self.cfg.SetDiskID(new_drbd, remote_node)
3272
      rpc.call_blockdev_remove(remote_node, new_drbd)
3273
      raise errors.OpExecError("Failed to create volume on primary")
3274

    
3275
    # the device exists now
3276
    # call the primary node to add the mirror to md
3277
    logger.Info("adding new mirror component to md")
3278
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3279
                                         disk, [new_drbd]):
3280
      logger.Error("Can't add mirror compoment to md!")
3281
      self.cfg.SetDiskID(new_drbd, remote_node)
3282
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3283
        logger.Error("Can't rollback on secondary")
3284
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3285
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3286
        logger.Error("Can't rollback on primary")
3287
      raise errors.OpExecError("Can't add mirror component to md array")
3288

    
3289
    disk.children.append(new_drbd)
3290

    
3291
    self.cfg.AddInstance(instance)
3292

    
3293
    _WaitForSync(self.cfg, instance, self.proc)
3294

    
3295
    return 0
3296

    
3297

    
3298
class LURemoveMDDRBDComponent(LogicalUnit):
3299
  """Remove a component from a remote_raid1 disk.
3300

3301
  """
3302
  HPATH = "mirror-remove"
3303
  HTYPE = constants.HTYPE_INSTANCE
3304
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3305

    
3306
  def BuildHooksEnv(self):
3307
    """Build hooks env.
3308

3309
    This runs on the master, the primary and all the secondaries.
3310

3311
    """
3312
    env = {
3313
      "DISK_NAME": self.op.disk_name,
3314
      "DISK_ID": self.op.disk_id,
3315
      "OLD_SECONDARY": self.old_secondary,
3316
      }
3317
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3318
    nl = [self.sstore.GetMasterNode(),
3319
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3320
    return env, nl, nl
3321

    
3322
  def CheckPrereq(self):
3323
    """Check prerequisites.
3324

3325
    This checks that the instance is in the cluster.
3326

3327
    """
3328
    instance = self.cfg.GetInstanceInfo(
3329
      self.cfg.ExpandInstanceName(self.op.instance_name))
3330
    if instance is None:
3331
      raise errors.OpPrereqError("Instance '%s' not known" %
3332
                                 self.op.instance_name)
3333
    self.instance = instance
3334

    
3335
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3336
      raise errors.OpPrereqError("Instance's disk layout is not"
3337
                                 " remote_raid1.")
3338
    for disk in instance.disks:
3339
      if disk.iv_name == self.op.disk_name:
3340
        break
3341
    else:
3342
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3343
                                 " instance." % self.op.disk_name)
3344
    for child in disk.children:
3345
      if (child.dev_type == constants.LD_DRBD7 and
3346
          child.logical_id[2] == self.op.disk_id):
3347
        break
3348
    else:
3349
      raise errors.OpPrereqError("Can't find the device with this port.")
3350

    
3351
    if len(disk.children) < 2:
3352
      raise errors.OpPrereqError("Cannot remove the last component from"
3353
                                 " a mirror.")
3354
    self.disk = disk
3355
    self.child = child
3356
    if self.child.logical_id[0] == instance.primary_node:
3357
      oid = 1
3358
    else:
3359
      oid = 0
3360
    self.old_secondary = self.child.logical_id[oid]
3361

    
3362
  def Exec(self, feedback_fn):
3363
    """Remove the mirror component
3364

3365
    """
3366
    instance = self.instance
3367
    disk = self.disk
3368
    child = self.child
3369
    logger.Info("remove mirror component")
3370
    self.cfg.SetDiskID(disk, instance.primary_node)
3371
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3372
                                            disk, [child]):
3373
      raise errors.OpExecError("Can't remove child from mirror.")
3374

    
3375
    for node in child.logical_id[:2]:
3376
      self.cfg.SetDiskID(child, node)
3377
      if not rpc.call_blockdev_remove(node, child):
3378
        logger.Error("Warning: failed to remove device from node %s,"
3379
                     " continuing operation." % node)
3380

    
3381
    disk.children.remove(child)
3382
    self.cfg.AddInstance(instance)
3383

    
3384

    
3385
class LUReplaceDisks(LogicalUnit):
3386
  """Replace the disks of an instance.
3387

3388
  """
3389
  HPATH = "mirrors-replace"
3390
  HTYPE = constants.HTYPE_INSTANCE
3391
  _OP_REQP = ["instance_name", "mode", "disks"]
3392

    
3393
  def BuildHooksEnv(self):
3394
    """Build hooks env.
3395

3396
    This runs on the master, the primary and all the secondaries.
3397

3398
    """
3399
    env = {
3400
      "MODE": self.op.mode,
3401
      "NEW_SECONDARY": self.op.remote_node,
3402
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3403
      }
3404
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3405
    nl = [
3406
      self.sstore.GetMasterNode(),
3407
      self.instance.primary_node,
3408
      ]
3409
    if self.op.remote_node is not None:
3410
      nl.append(self.op.remote_node)
3411
    return env, nl, nl
3412

    
3413
  def CheckPrereq(self):
3414
    """Check prerequisites.
3415

3416
    This checks that the instance is in the cluster.
3417

3418
    """
3419
    instance = self.cfg.GetInstanceInfo(
3420
      self.cfg.ExpandInstanceName(self.op.instance_name))
3421
    if instance is None:
3422
      raise errors.OpPrereqError("Instance '%s' not known" %
3423
                                 self.op.instance_name)
3424
    self.instance = instance
3425
    self.op.instance_name = instance.name
3426

    
3427
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3428
      raise errors.OpPrereqError("Instance's disk layout is not"
3429
                                 " network mirrored.")
3430

    
3431
    if len(instance.secondary_nodes) != 1:
3432
      raise errors.OpPrereqError("The instance has a strange layout,"
3433
                                 " expected one secondary but found %d" %
3434
                                 len(instance.secondary_nodes))
3435

    
3436
    self.sec_node = instance.secondary_nodes[0]
3437

    
3438
    remote_node = getattr(self.op, "remote_node", None)
3439
    if remote_node is not None:
3440
      remote_node = self.cfg.ExpandNodeName(remote_node)
3441
      if remote_node is None:
3442
        raise errors.OpPrereqError("Node '%s' not known" %
3443
                                   self.op.remote_node)
3444
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3445
    else:
3446
      self.remote_node_info = None
3447
    if remote_node == instance.primary_node:
3448
      raise errors.OpPrereqError("The specified node is the primary node of"
3449
                                 " the instance.")
3450
    elif remote_node == self.sec_node:
3451
      if self.op.mode == constants.REPLACE_DISK_SEC:
3452
        # this is for DRBD8, where we can't execute the same mode of
3453
        # replacement as for drbd7 (no different port allocated)
3454
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3455
                                   " replacement")
3456
      # the user gave the current secondary, switch to
3457
      # 'no-replace-secondary' mode for drbd7
3458
      remote_node = None
3459
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3460
        self.op.mode != constants.REPLACE_DISK_ALL):
3461
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3462
                                 " disks replacement, not individual ones")
3463
    if instance.disk_template == constants.DT_DRBD8:
3464
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3465
          remote_node is not None):
3466
        # switch to replace secondary mode
3467
        self.op.mode = constants.REPLACE_DISK_SEC
3468

    
3469
      if self.op.mode == constants.REPLACE_DISK_ALL:
3470
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3471
                                   " secondary disk replacement, not"
3472
                                   " both at once")
3473
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3474
        if remote_node is not None:
3475
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3476
                                     " the secondary while doing a primary"
3477
                                     " node disk replacement")
3478
        self.tgt_node = instance.primary_node
3479
        self.oth_node = instance.secondary_nodes[0]
3480
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3481
        self.new_node = remote_node # this can be None, in which case
3482
                                    # we don't change the secondary
3483
        self.tgt_node = instance.secondary_nodes[0]
3484
        self.oth_node = instance.primary_node
3485
      else:
3486
        raise errors.ProgrammerError("Unhandled disk replace mode")
3487

    
3488
    for name in self.op.disks:
3489
      if instance.FindDisk(name) is None:
3490
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3491
                                   (name, instance.name))
3492
    self.op.remote_node = remote_node
3493

    
3494
  def _ExecRR1(self, feedback_fn):
3495
    """Replace the disks of an instance.
3496

3497
    """
3498
    instance = self.instance
3499
    iv_names = {}
3500
    # start of work
3501
    if self.op.remote_node is None:
3502
      remote_node = self.sec_node
3503
    else:
3504
      remote_node = self.op.remote_node
3505
    cfg = self.cfg
3506
    for dev in instance.disks:
3507
      size = dev.size
3508
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3509
      names = _GenerateUniqueNames(cfg, lv_names)
3510
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3511
                                       remote_node, size, names)
3512
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3513
      logger.Info("adding new mirror component on secondary for %s" %
3514
                  dev.iv_name)
3515
      #HARDCODE
3516
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3517
                                        new_drbd, False,
3518
                                        _GetInstanceInfoText(instance)):
3519
        raise errors.OpExecError("Failed to create new component on secondary"
3520
                                 " node %s. Full abort, cleanup manually!" %
3521
                                 remote_node)
3522

    
3523
      logger.Info("adding new mirror component on primary")
3524
      #HARDCODE
3525
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3526
                                      instance, new_drbd,
3527
                                      _GetInstanceInfoText(instance)):
3528
        # remove secondary dev
3529
        cfg.SetDiskID(new_drbd, remote_node)
3530
        rpc.call_blockdev_remove(remote_node, new_drbd)
3531
        raise errors.OpExecError("Failed to create volume on primary!"
3532
                                 " Full abort, cleanup manually!!")
3533

    
3534
      # the device exists now
3535
      # call the primary node to add the mirror to md
3536
      logger.Info("adding new mirror component to md")
3537
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3538
                                           [new_drbd]):
3539
        logger.Error("Can't add mirror compoment to md!")
3540
        cfg.SetDiskID(new_drbd, remote_node)
3541
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3542
          logger.Error("Can't rollback on secondary")
3543
        cfg.SetDiskID(new_drbd, instance.primary_node)
3544
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3545
          logger.Error("Can't rollback on primary")
3546
        raise errors.OpExecError("Full abort, cleanup manually!!")
3547

    
3548
      dev.children.append(new_drbd)
3549
      cfg.AddInstance(instance)
3550

    
3551
    # this can fail as the old devices are degraded and _WaitForSync
3552
    # does a combined result over all disks, so we don't check its
3553
    # return value
3554
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3555

    
3556
    # so check manually all the devices
3557
    for name in iv_names:
3558
      dev, child, new_drbd = iv_names[name]
3559
      cfg.SetDiskID(dev, instance.primary_node)
3560
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3561
      if is_degr:
3562
        raise errors.OpExecError("MD device %s is degraded!" % name)
3563
      cfg.SetDiskID(new_drbd, instance.primary_node)
3564
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3565
      if is_degr:
3566
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3567

    
3568
    for name in iv_names:
3569
      dev, child, new_drbd = iv_names[name]
3570
      logger.Info("remove mirror %s component" % name)
3571
      cfg.SetDiskID(dev, instance.primary_node)
3572
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3573
                                              dev, [child]):
3574
        logger.Error("Can't remove child from mirror, aborting"
3575
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3576
        continue
3577

    
3578
      for node in child.logical_id[:2]:
3579
        logger.Info("remove child device on %s" % node)
3580
        cfg.SetDiskID(child, node)
3581
        if not rpc.call_blockdev_remove(node, child):
3582
          logger.Error("Warning: failed to remove device from node %s,"
3583
                       " continuing operation." % node)
3584

    
3585
      dev.children.remove(child)
3586

    
3587
      cfg.AddInstance(instance)
3588

    
3589
  def _ExecD8DiskOnly(self, feedback_fn):
3590
    """Replace a disk on the primary or secondary for dbrd8.
3591

3592
    The algorithm for replace is quite complicated:
3593
      - for each disk to be replaced:
3594
        - create new LVs on the target node with unique names
3595
        - detach old LVs from the drbd device
3596
        - rename old LVs to name_replaced.<time_t>
3597
        - rename new LVs to old LVs
3598
        - attach the new LVs (with the old names now) to the drbd device
3599
      - wait for sync across all devices
3600
      - for each modified disk:
3601
        - remove old LVs (which have the name name_replaces.<time_t>)
3602

3603
    Failures are not very well handled.
3604

3605
    """
3606
    steps_total = 6
3607
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3608
    instance = self.instance
3609
    iv_names = {}
3610
    vgname = self.cfg.GetVGName()
3611
    # start of work
3612
    cfg = self.cfg
3613
    tgt_node = self.tgt_node
3614
    oth_node = self.oth_node
3615

    
3616
    # Step: check device activation
3617
    self.proc.LogStep(1, steps_total, "check device existence")
3618
    info("checking volume groups")
3619
    my_vg = cfg.GetVGName()
3620
    results = rpc.call_vg_list([oth_node, tgt_node])
3621
    if not results:
3622
      raise errors.OpExecError("Can't list volume groups on the nodes")
3623
    for node in oth_node, tgt_node:
3624
      res = results.get(node, False)
3625
      if not res or my_vg not in res:
3626
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3627
                                 (my_vg, node))
3628
    for dev in instance.disks:
3629
      if not dev.iv_name in self.op.disks:
3630
        continue
3631
      for node in tgt_node, oth_node:
3632
        info("checking %s on %s" % (dev.iv_name, node))
3633
        cfg.SetDiskID(dev, node)
3634
        if not rpc.call_blockdev_find(node, dev):
3635
          raise errors.OpExecError("Can't find device %s on node %s" %
3636
                                   (dev.iv_name, node))
3637

    
3638
    # Step: check other node consistency
3639
    self.proc.LogStep(2, steps_total, "check peer consistency")
3640
    for dev in instance.disks:
3641
      if not dev.iv_name in self.op.disks:
3642
        continue
3643
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3644
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3645
                                   oth_node==instance.primary_node):
3646
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3647
                                 " to replace disks on this node (%s)" %
3648
                                 (oth_node, tgt_node))
3649

    
3650
    # Step: create new storage
3651
    self.proc.LogStep(3, steps_total, "allocate new storage")
3652
    for dev in instance.disks:
3653
      if not dev.iv_name in self.op.disks:
3654
        continue
3655
      size = dev.size
3656
      cfg.SetDiskID(dev, tgt_node)
3657
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3658
      names = _GenerateUniqueNames(cfg, lv_names)
3659
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3660
                             logical_id=(vgname, names[0]))
3661
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3662
                             logical_id=(vgname, names[1]))
3663
      new_lvs = [lv_data, lv_meta]
3664
      old_lvs = dev.children
3665
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3666
      info("creating new local storage on %s for %s" %
3667
           (tgt_node, dev.iv_name))
3668
      # since we *always* want to create this LV, we use the
3669
      # _Create...OnPrimary (which forces the creation), even if we
3670
      # are talking about the secondary node
3671
      for new_lv in new_lvs:
3672
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3673
                                        _GetInstanceInfoText(instance)):
3674
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3675
                                   " node '%s'" %
3676
                                   (new_lv.logical_id[1], tgt_node))
3677

    
3678
    # Step: for each lv, detach+rename*2+attach
3679
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3680
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3681
      info("detaching %s drbd from local storage" % dev.iv_name)
3682
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3683
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3684
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3685
      #dev.children = []
3686
      #cfg.Update(instance)
3687

    
3688
      # ok, we created the new LVs, so now we know we have the needed
3689
      # storage; as such, we proceed on the target node to rename
3690
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3691
      # using the assumption than logical_id == physical_id (which in
3692
      # turn is the unique_id on that node)
3693

    
3694
      # FIXME(iustin): use a better name for the replaced LVs
3695
      temp_suffix = int(time.time())
3696
      ren_fn = lambda d, suff: (d.physical_id[0],
3697
                                d.physical_id[1] + "_replaced-%s" % suff)
3698
      # build the rename list based on what LVs exist on the node
3699
      rlist = []
3700
      for to_ren in old_lvs:
3701
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3702
        if find_res is not None: # device exists
3703
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3704

    
3705
      info("renaming the old LVs on the target node")
3706
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3707
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3708
      # now we rename the new LVs to the old LVs
3709
      info("renaming the new LVs on the target node")
3710
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3711
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3712
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3713

    
3714
      for old, new in zip(old_lvs, new_lvs):
3715
        new.logical_id = old.logical_id
3716
        cfg.SetDiskID(new, tgt_node)
3717

    
3718
      for disk in old_lvs:
3719
        disk.logical_id = ren_fn(disk, temp_suffix)
3720
        cfg.SetDiskID(disk, tgt_node)
3721

    
3722
      # now that the new lvs have the old name, we can add them to the device
3723
      info("adding new mirror component on %s" % tgt_node)
3724
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3725
        for new_lv in new_lvs:
3726
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3727
            warning("Can't rollback device %s", hint="manually cleanup unused"
3728
                    " logical volumes")
3729
        raise errors.OpExecError("Can't add local storage to drbd")
3730

    
3731
      dev.children = new_lvs
3732
      cfg.Update(instance)
3733

    
3734
    # Step: wait for sync
3735

    
3736
    # this can fail as the old devices are degraded and _WaitForSync
3737
    # does a combined result over all disks, so we don't check its
3738
    # return value
3739
    self.proc.LogStep(5, steps_total, "sync devices")
3740
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3741

    
3742
    # so check manually all the devices
3743
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3744
      cfg.SetDiskID(dev, instance.primary_node)
3745
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3746
      if is_degr:
3747
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3748

    
3749
    # Step: remove old storage
3750
    self.proc.LogStep(6, steps_total, "removing old storage")
3751
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3752
      info("remove logical volumes for %s" % name)
3753
      for lv in old_lvs:
3754
        cfg.SetDiskID(lv, tgt_node)
3755
        if not rpc.call_blockdev_remove(tgt_node, lv):
3756
          warning("Can't remove old LV", hint="manually remove unused LVs")
3757
          continue
3758

    
3759
  def _ExecD8Secondary(self, feedback_fn):
3760
    """Replace the secondary node for drbd8.
3761

3762
    The algorithm for replace is quite complicated:
3763
      - for all disks of the instance:
3764
        - create new LVs on the new node with same names
3765
        - shutdown the drbd device on the old secondary
3766
        - disconnect the drbd network on the primary
3767
        - create the drbd device on the new secondary
3768
        - network attach the drbd on the primary, using an artifice:
3769
          the drbd code for Attach() will connect to the network if it
3770
          finds a device which is connected to the good local disks but
3771
          not network enabled
3772
      - wait for sync across all devices
3773
      - remove all disks from the old secondary
3774

3775
    Failures are not very well handled.
3776

3777
    """
3778
    steps_total = 6
3779
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3780
    instance = self.instance
3781
    iv_names = {}
3782
    vgname = self.cfg.GetVGName()
3783
    # start of work
3784
    cfg = self.cfg
3785
    old_node = self.tgt_node
3786
    new_node = self.new_node
3787
    pri_node = instance.primary_node
3788

    
3789
    # Step: check device activation
3790
    self.proc.LogStep(1, steps_total, "check device existence")
3791
    info("checking volume groups")
3792
    my_vg = cfg.GetVGName()
3793
    results = rpc.call_vg_list([pri_node, new_node])
3794
    if not results:
3795
      raise errors.OpExecError("Can't list volume groups on the nodes")
3796
    for node in pri_node, new_node:
3797
      res = results.get(node, False)
3798
      if not res or my_vg not in res:
3799
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3800
                                 (my_vg, node))
3801
    for dev in instance.disks:
3802
      if not dev.iv_name in self.op.disks:
3803
        continue
3804
      info("checking %s on %s" % (dev.iv_name, pri_node))
3805
      cfg.SetDiskID(dev, pri_node)
3806
      if not rpc.call_blockdev_find(pri_node, dev):
3807
        raise errors.OpExecError("Can't find device %s on node %s" %
3808
                                 (dev.iv_name, pri_node))
3809

    
3810
    # Step: check other node consistency
3811
    self.proc.LogStep(2, steps_total, "check peer consistency")
3812
    for dev in instance.disks:
3813
      if not dev.iv_name in self.op.disks:
3814
        continue
3815
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3816
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3817
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3818
                                 " unsafe to replace the secondary" %
3819
                                 pri_node)
3820

    
3821
    # Step: create new storage
3822
    self.proc.LogStep(3, steps_total, "allocate new storage")
3823
    for dev in instance.disks:
3824
      size = dev.size
3825
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3826
      # since we *always* want to create this LV, we use the
3827
      # _Create...OnPrimary (which forces the creation), even if we
3828
      # are talking about the secondary node
3829
      for new_lv in dev.children:
3830
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3831
                                        _GetInstanceInfoText(instance)):
3832
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3833
                                   " node '%s'" %
3834
                                   (new_lv.logical_id[1], new_node))
3835

    
3836
      iv_names[dev.iv_name] = (dev, dev.children)
3837

    
3838
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3839
    for dev in instance.disks:
3840
      size = dev.size
3841
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3842
      # create new devices on new_node
3843
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3844
                              logical_id=(pri_node, new_node,
3845
                                          dev.logical_id[2]),
3846
                              children=dev.children)
3847
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3848
                                        new_drbd, False,
3849
                                      _GetInstanceInfoText(instance)):
3850
        raise errors.OpExecError("Failed to create new DRBD on"
3851
                                 " node '%s'" % new_node)
3852

    
3853
    for dev in instance.disks:
3854
      # we have new devices, shutdown the drbd on the old secondary
3855
      info("shutting down drbd for %s on old node" % dev.iv_name)
3856
      cfg.SetDiskID(dev, old_node)
3857
      if not rpc.call_blockdev_shutdown(old_node, dev):
3858
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3859
                hint="Please cleanup this device manually as soon as possible")
3860

    
3861
    info("detaching primary drbds from the network (=> standalone)")
3862
    done = 0
3863
    for dev in instance.disks:
3864
      cfg.SetDiskID(dev, pri_node)
3865
      # set the physical (unique in bdev terms) id to None, meaning
3866
      # detach from network
3867
      dev.physical_id = (None,) * len(dev.physical_id)
3868
      # and 'find' the device, which will 'fix' it to match the
3869
      # standalone state
3870
      if rpc.call_blockdev_find(pri_node, dev):
3871
        done += 1
3872
      else:
3873
        warning("Failed to detach drbd %s from network, unusual case" %
3874
                dev.iv_name)
3875

    
3876
    if not done:
3877
      # no detaches succeeded (very unlikely)
3878
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3879

    
3880
    # if we managed to detach at least one, we update all the disks of
3881
    # the instance to point to the new secondary
3882
    info("updating instance configuration")
3883
    for dev in instance.disks:
3884
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3885
      cfg.SetDiskID(dev, pri_node)
3886
    cfg.Update(instance)
3887

    
3888
    # and now perform the drbd attach
3889
    info("attaching primary drbds to new secondary (standalone => connected)")
3890
    failures = []
3891
    for dev in instance.disks:
3892
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3893
      # since the attach is smart, it's enough to 'find' the device,
3894
      # it will automatically activate the network, if the physical_id
3895
      # is correct
3896
      cfg.SetDiskID(dev, pri_node)
3897
      if not rpc.call_blockdev_find(pri_node, dev):
3898
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3899
                "please do a gnt-instance info to see the status of disks")
3900

    
3901
    # this can fail as the old devices are degraded and _WaitForSync
3902
    # does a combined result over all disks, so we don't check its
3903
    # return value
3904
    self.proc.LogStep(5, steps_total, "sync devices")
3905
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3906

    
3907
    # so check manually all the devices
3908
    for name, (dev, old_lvs) in iv_names.iteritems():
3909
      cfg.SetDiskID(dev, pri_node)
3910
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3911
      if is_degr:
3912
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3913

    
3914
    self.proc.LogStep(6, steps_total, "removing old storage")
3915
    for name, (dev, old_lvs) in iv_names.iteritems():
3916
      info("remove logical volumes for %s" % name)
3917
      for lv in old_lvs:
3918
        cfg.SetDiskID(lv, old_node)
3919
        if not rpc.call_blockdev_remove(old_node, lv):
3920
          warning("Can't remove LV on old secondary",
3921
                  hint="Cleanup stale volumes by hand")
3922

    
3923
  def Exec(self, feedback_fn):
3924
    """Execute disk replacement.
3925

3926
    This dispatches the disk replacement to the appropriate handler.
3927

3928
    """
3929
    instance = self.instance
3930
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3931
      fn = self._ExecRR1
3932
    elif instance.disk_template == constants.DT_DRBD8:
3933
      if self.op.remote_node is None:
3934
        fn = self._ExecD8DiskOnly
3935
      else:
3936
        fn = self._ExecD8Secondary
3937
    else:
3938
      raise errors.ProgrammerError("Unhandled disk replacement case")
3939
    return fn(feedback_fn)
3940

    
3941

    
3942
class LUQueryInstanceData(NoHooksLU):
3943
  """Query runtime instance data.
3944

3945
  """
3946
  _OP_REQP = ["instances"]
3947

    
3948
  def CheckPrereq(self):
3949
    """Check prerequisites.
3950

3951
    This only checks the optional instance list against the existing names.
3952

3953
    """
3954
    if not isinstance(self.op.instances, list):
3955
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3956
    if self.op.instances:
3957
      self.wanted_instances = []
3958
      names = self.op.instances
3959
      for name in names:
3960
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3961
        if instance is None:
3962
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3963
      self.wanted_instances.append(instance)
3964
    else:
3965
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3966
                               in self.cfg.GetInstanceList()]
3967
    return
3968

    
3969

    
3970
  def _ComputeDiskStatus(self, instance, snode, dev):
3971
    """Compute block device status.
3972

3973
    """
3974
    self.cfg.SetDiskID(dev, instance.primary_node)
3975
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3976
    if dev.dev_type in constants.LDS_DRBD:
3977
      # we change the snode then (otherwise we use the one passed in)
3978
      if dev.logical_id[0] == instance.primary_node:
3979
        snode = dev.logical_id[1]
3980
      else:
3981
        snode = dev.logical_id[0]
3982

    
3983
    if snode:
3984
      self.cfg.SetDiskID(dev, snode)
3985
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3986
    else:
3987
      dev_sstatus = None
3988

    
3989
    if dev.children:
3990
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3991
                      for child in dev.children]
3992
    else:
3993
      dev_children = []
3994

    
3995
    data = {
3996
      "iv_name": dev.iv_name,
3997
      "dev_type": dev.dev_type,
3998
      "logical_id": dev.logical_id,
3999
      "physical_id": dev.physical_id,
4000
      "pstatus": dev_pstatus,
4001
      "sstatus": dev_sstatus,
4002
      "children": dev_children,
4003
      }
4004

    
4005
    return data
4006

    
4007
  def Exec(self, feedback_fn):
4008
    """Gather and return data"""
4009
    result = {}
4010
    for instance in self.wanted_instances:
4011
      remote_info = rpc.call_instance_info(instance.primary_node,
4012
                                                instance.name)
4013
      if remote_info and "state" in remote_info:
4014
        remote_state = "up"
4015
      else:
4016
        remote_state = "down"
4017
      if instance.status == "down":
4018
        config_state = "down"
4019
      else:
4020
        config_state = "up"
4021

    
4022
      disks = [self._ComputeDiskStatus(instance, None, device)
4023
               for device in instance.disks]
4024

    
4025
      idict = {
4026
        "name": instance.name,
4027
        "config_state": config_state,
4028
        "run_state": remote_state,
4029
        "pnode": instance.primary_node,
4030
        "snodes": instance.secondary_nodes,
4031
        "os": instance.os,
4032
        "memory": instance.memory,
4033
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4034
        "disks": disks,
4035
        "network_port": instance.network_port,
4036
        "vcpus": instance.vcpus,
4037
        }
4038

    
4039
      result[instance.name] = idict
4040

    
4041
    return result
4042

    
4043

    
4044
class LUSetInstanceParms(LogicalUnit):
4045
  """Modifies an instances's parameters.
4046

4047
  """
4048
  HPATH = "instance-modify"
4049
  HTYPE = constants.HTYPE_INSTANCE
4050
  _OP_REQP = ["instance_name"]
4051

    
4052
  def BuildHooksEnv(self):
4053
    """Build hooks env.
4054

4055
    This runs on the master, primary and secondaries.
4056

4057
    """
4058
    args = dict()
4059
    if self.mem:
4060
      args['memory'] = self.mem
4061
    if self.vcpus:
4062
      args['vcpus'] = self.vcpus
4063
    if self.do_ip or self.do_bridge:
4064
      if self.do_ip:
4065
        ip = self.ip
4066
      else:
4067
        ip = self.instance.nics[0].ip
4068
      if self.bridge:
4069
        bridge = self.bridge
4070
      else:
4071
        bridge = self.instance.nics[0].bridge
4072
      args['nics'] = [(ip, bridge)]
4073
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4074
    nl = [self.sstore.GetMasterNode(),
4075
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4076
    return env, nl, nl
4077

    
4078
  def CheckPrereq(self):
4079
    """Check prerequisites.
4080

4081
    This only checks the instance list against the existing names.
4082

4083
    """
4084
    self.mem = getattr(self.op, "mem", None)
4085
    self.vcpus = getattr(self.op, "vcpus", None)
4086
    self.ip = getattr(self.op, "ip", None)
4087
    self.mac = getattr(self.op, "mac", None)
4088
    self.bridge = getattr(self.op, "bridge", None)
4089
    if [self.mem, self.vcpus, self.ip, self.bridge, self.mac].count(None) == 5:
4090
      raise errors.OpPrereqError("No changes submitted")
4091
    if self.mem is not None:
4092
      try:
4093
        self.mem = int(self.mem)
4094
      except ValueError, err:
4095
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4096
    if self.vcpus is not None:
4097
      try:
4098
        self.vcpus = int(self.vcpus)
4099
      except ValueError, err:
4100
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4101
    if self.ip is not None:
4102
      self.do_ip = True
4103
      if self.ip.lower() == "none":
4104
        self.ip = None
4105
      else:
4106
        if not utils.IsValidIP(self.ip):
4107
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4108
    else:
4109
      self.do_ip = False
4110
    self.do_bridge = (self.bridge is not None)
4111
    if self.mac is not None:
4112
      if self.cfg.IsMacInUse(self.mac):
4113
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4114
                                   self.mac)
4115
      if not utils.IsValidMac(self.mac):
4116
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4117

    
4118
    instance = self.cfg.GetInstanceInfo(
4119
      self.cfg.ExpandInstanceName(self.op.instance_name))
4120
    if instance is None:
4121
      raise errors.OpPrereqError("No such instance name '%s'" %
4122
                                 self.op.instance_name)
4123
    self.op.instance_name = instance.name
4124
    self.instance = instance
4125
    return
4126

    
4127
  def Exec(self, feedback_fn):
4128
    """Modifies an instance.
4129

4130
    All parameters take effect only at the next restart of the instance.
4131
    """
4132
    result = []
4133
    instance = self.instance
4134
    if self.mem:
4135
      instance.memory = self.mem
4136
      result.append(("mem", self.mem))
4137
    if self.vcpus:
4138
      instance.vcpus = self.vcpus
4139
      result.append(("vcpus",  self.vcpus))
4140
    if self.do_ip:
4141
      instance.nics[0].ip = self.ip
4142
      result.append(("ip", self.ip))
4143
    if self.bridge:
4144
      instance.nics[0].bridge = self.bridge
4145
      result.append(("bridge", self.bridge))
4146
    if self.mac:
4147
      instance.nics[0].mac = self.mac
4148
      result.append(("mac", self.mac))
4149

    
4150
    self.cfg.AddInstance(instance)
4151

    
4152
    return result
4153

    
4154

    
4155
class LUQueryExports(NoHooksLU):
4156
  """Query the exports list
4157

4158
  """
4159
  _OP_REQP = []
4160

    
4161
  def CheckPrereq(self):
4162
    """Check that the nodelist contains only existing nodes.
4163

4164
    """
4165
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4166

    
4167
  def Exec(self, feedback_fn):
4168
    """Compute the list of all the exported system images.
4169

4170
    Returns:
4171
      a dictionary with the structure node->(export-list)
4172
      where export-list is a list of the instances exported on
4173
      that node.
4174

4175
    """
4176
    return rpc.call_export_list(self.nodes)
4177

    
4178

    
4179
class LUExportInstance(LogicalUnit):
4180
  """Export an instance to an image in the cluster.
4181

4182
  """
4183
  HPATH = "instance-export"
4184
  HTYPE = constants.HTYPE_INSTANCE
4185
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4186

    
4187
  def BuildHooksEnv(self):
4188
    """Build hooks env.
4189

4190
    This will run on the master, primary node and target node.
4191

4192
    """
4193
    env = {
4194
      "EXPORT_NODE": self.op.target_node,
4195
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4196
      }
4197
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4198
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4199
          self.op.target_node]
4200
    return env, nl, nl
4201

    
4202
  def CheckPrereq(self):
4203
    """Check prerequisites.
4204

4205
    This checks that the instance name is a valid one.
4206

4207
    """
4208
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4209
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4210
    if self.instance is None:
4211
      raise errors.OpPrereqError("Instance '%s' not found" %
4212
                                 self.op.instance_name)
4213

    
4214
    # node verification
4215
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4216
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4217

    
4218
    if self.dst_node is None:
4219
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4220
                                 self.op.target_node)
4221
    self.op.target_node = self.dst_node.name
4222

    
4223
  def Exec(self, feedback_fn):
4224
    """Export an instance to an image in the cluster.
4225

4226
    """
4227
    instance = self.instance
4228
    dst_node = self.dst_node
4229
    src_node = instance.primary_node
4230
    # shutdown the instance, unless requested not to do so
4231
    if self.op.shutdown:
4232
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4233
      self.proc.ChainOpCode(op)
4234

    
4235
    vgname = self.cfg.GetVGName()
4236

    
4237
    snap_disks = []
4238

    
4239
    try:
4240
      for disk in instance.disks:
4241
        if disk.iv_name == "sda":
4242
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4243
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4244

    
4245
          if not new_dev_name:
4246
            logger.Error("could not snapshot block device %s on node %s" %
4247
                         (disk.logical_id[1], src_node))
4248
          else:
4249
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4250
                                      logical_id=(vgname, new_dev_name),
4251
                                      physical_id=(vgname, new_dev_name),
4252
                                      iv_name=disk.iv_name)
4253
            snap_disks.append(new_dev)
4254

    
4255
    finally:
4256
      if self.op.shutdown:
4257
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4258
                                       force=False)
4259
        self.proc.ChainOpCode(op)
4260

    
4261
    # TODO: check for size
4262

    
4263
    for dev in snap_disks:
4264
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4265
                                           instance):
4266
        logger.Error("could not export block device %s from node"
4267
                     " %s to node %s" %
4268
                     (dev.logical_id[1], src_node, dst_node.name))
4269
      if not rpc.call_blockdev_remove(src_node, dev):
4270
        logger.Error("could not remove snapshot block device %s from"
4271
                     " node %s" % (dev.logical_id[1], src_node))
4272

    
4273
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4274
      logger.Error("could not finalize export for instance %s on node %s" %
4275
                   (instance.name, dst_node.name))
4276

    
4277
    nodelist = self.cfg.GetNodeList()
4278
    nodelist.remove(dst_node.name)
4279

    
4280
    # on one-node clusters nodelist will be empty after the removal
4281
    # if we proceed the backup would be removed because OpQueryExports
4282
    # substitutes an empty list with the full cluster node list.
4283
    if nodelist:
4284
      op = opcodes.OpQueryExports(nodes=nodelist)
4285
      exportlist = self.proc.ChainOpCode(op)
4286
      for node in exportlist:
4287
        if instance.name in exportlist[node]:
4288
          if not rpc.call_export_remove(node, instance.name):
4289
            logger.Error("could not remove older export for instance %s"
4290
                         " on node %s" % (instance.name, node))
4291

    
4292

    
4293
class TagsLU(NoHooksLU):
4294
  """Generic tags LU.
4295

4296
  This is an abstract class which is the parent of all the other tags LUs.
4297

4298
  """
4299
  def CheckPrereq(self):
4300
    """Check prerequisites.
4301

4302
    """
4303
    if self.op.kind == constants.TAG_CLUSTER:
4304
      self.target = self.cfg.GetClusterInfo()
4305
    elif self.op.kind == constants.TAG_NODE:
4306
      name = self.cfg.ExpandNodeName(self.op.name)
4307
      if name is None:
4308
        raise errors.OpPrereqError("Invalid node name (%s)" %
4309
                                   (self.op.name,))
4310
      self.op.name = name
4311
      self.target = self.cfg.GetNodeInfo(name)
4312
    elif self.op.kind == constants.TAG_INSTANCE:
4313
      name = self.cfg.ExpandInstanceName(self.op.name)
4314
      if name is None:
4315
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4316
                                   (self.op.name,))
4317
      self.op.name = name
4318
      self.target = self.cfg.GetInstanceInfo(name)
4319
    else:
4320
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4321
                                 str(self.op.kind))
4322

    
4323

    
4324
class LUGetTags(TagsLU):
4325
  """Returns the tags of a given object.
4326

4327
  """
4328
  _OP_REQP = ["kind", "name"]
4329

    
4330
  def Exec(self, feedback_fn):
4331
    """Returns the tag list.
4332

4333
    """
4334
    return self.target.GetTags()
4335

    
4336

    
4337
class LUSearchTags(NoHooksLU):
4338
  """Searches the tags for a given pattern.
4339

4340
  """
4341
  _OP_REQP = ["pattern"]
4342

    
4343
  def CheckPrereq(self):
4344
    """Check prerequisites.
4345

4346
    This checks the pattern passed for validity by compiling it.
4347

4348
    """
4349
    try:
4350
      self.re = re.compile(self.op.pattern)
4351
    except re.error, err:
4352
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4353
                                 (self.op.pattern, err))
4354

    
4355
  def Exec(self, feedback_fn):
4356
    """Returns the tag list.
4357

4358
    """
4359
    cfg = self.cfg
4360
    tgts = [("/cluster", cfg.GetClusterInfo())]
4361
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4362
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4363
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4364
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4365
    results = []
4366
    for path, target in tgts:
4367
      for tag in target.GetTags():
4368
        if self.re.search(tag):
4369
          results.append((path, tag))
4370
    return results
4371

    
4372

    
4373
class LUAddTags(TagsLU):
4374
  """Sets a tag on a given object.
4375

4376
  """
4377
  _OP_REQP = ["kind", "name", "tags"]
4378

    
4379
  def CheckPrereq(self):
4380
    """Check prerequisites.
4381

4382
    This checks the type and length of the tag name and value.
4383

4384
    """
4385
    TagsLU.CheckPrereq(self)
4386
    for tag in self.op.tags:
4387
      objects.TaggableObject.ValidateTag(tag)
4388

    
4389
  def Exec(self, feedback_fn):
4390
    """Sets the tag.
4391

4392
    """
4393
    try:
4394
      for tag in self.op.tags:
4395
        self.target.AddTag(tag)
4396
    except errors.TagError, err:
4397
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4398
    try:
4399
      self.cfg.Update(self.target)
4400
    except errors.ConfigurationError:
4401
      raise errors.OpRetryError("There has been a modification to the"
4402
                                " config file and the operation has been"
4403
                                " aborted. Please retry.")
4404

    
4405

    
4406
class LUDelTags(TagsLU):
4407
  """Delete a list of tags from a given object.
4408

4409
  """
4410
  _OP_REQP = ["kind", "name", "tags"]
4411

    
4412
  def CheckPrereq(self):
4413
    """Check prerequisites.
4414

4415
    This checks that we have the given tag.
4416

4417
    """
4418
    TagsLU.CheckPrereq(self)
4419
    for tag in self.op.tags:
4420
      objects.TaggableObject.ValidateTag(tag)
4421
    del_tags = frozenset(self.op.tags)
4422
    cur_tags = self.target.GetTags()
4423
    if not del_tags <= cur_tags:
4424
      diff_tags = del_tags - cur_tags
4425
      diff_names = ["'%s'" % tag for tag in diff_tags]
4426
      diff_names.sort()
4427
      raise errors.OpPrereqError("Tag(s) %s not found" %
4428
                                 (",".join(diff_names)))
4429

    
4430
  def Exec(self, feedback_fn):
4431
    """Remove the tag from the object.
4432

4433
    """
4434
    for tag in self.op.tags:
4435
      self.target.RemoveTag(tag)
4436
    try:
4437
      self.cfg.Update(self.target)
4438
    except errors.ConfigurationError:
4439
      raise errors.OpRetryError("There has been a modification to the"
4440
                                " config file and the operation has been"
4441
                                " aborted. Please retry.")