Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 36c68ff1

History | View | Annotate | Download (145.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    self.hostname = hostname = utils.HostInfo()
503

    
504
    if hostname.ip.startswith("127."):
505
      raise errors.OpPrereqError("This host's IP resolves to the private"
506
                                 " range (%s). Please fix DNS or /etc/hosts." %
507
                                 (hostname.ip,))
508

    
509
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510

    
511
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512
                         constants.DEFAULT_NODED_PORT):
513
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514
                                 " to %s,\nbut this ip address does not"
515
                                 " belong to this host."
516
                                 " Aborting." % hostname.ip)
517

    
518
    secondary_ip = getattr(self.op, "secondary_ip", None)
519
    if secondary_ip and not utils.IsValidIP(secondary_ip):
520
      raise errors.OpPrereqError("Invalid secondary ip given")
521
    if (secondary_ip and
522
        secondary_ip != hostname.ip and
523
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524
                           constants.DEFAULT_NODED_PORT))):
525
      raise errors.OpPrereqError("You gave %s as secondary IP,"
526
                                 " but it does not belong to this host." %
527
                                 secondary_ip)
528
    self.secondary_ip = secondary_ip
529

    
530
    # checks presence of the volume group given
531
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532

    
533
    if vgstatus:
534
      raise errors.OpPrereqError("Error: %s" % vgstatus)
535

    
536
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537
                    self.op.mac_prefix):
538
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539
                                 self.op.mac_prefix)
540

    
541
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543
                                 self.op.hypervisor_type)
544

    
545
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546
    if result.failed:
547
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548
                                 (self.op.master_netdev,
549
                                  result.output.strip()))
550

    
551
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
554
                                 " executable." % constants.NODE_INITD_SCRIPT)
555

    
556
  def Exec(self, feedback_fn):
557
    """Initialize the cluster.
558

559
    """
560
    clustername = self.clustername
561
    hostname = self.hostname
562

    
563
    # set up the simple store
564
    self.sstore = ss = ssconf.SimpleStore()
565
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570

    
571
    # set up the inter-node password and certificate
572
    _InitGanetiServerSetup(ss)
573

    
574
    # start the master ip
575
    rpc.call_node_start_master(hostname.name)
576

    
577
    # set up ssh config and /etc/hosts
578
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
579
    try:
580
      sshline = f.read()
581
    finally:
582
      f.close()
583
    sshkey = sshline.split(" ")[1]
584

    
585
    _AddHostToEtcHosts(hostname.name)
586

    
587
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588

    
589
    _InitSSHSetup(hostname.name)
590

    
591
    # init of cluster config file
592
    self.cfg = cfgw = config.ConfigWriter()
593
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594
                    sshkey, self.op.mac_prefix,
595
                    self.op.vg_name, self.op.def_bridge)
596

    
597

    
598
class LUDestroyCluster(NoHooksLU):
599
  """Logical unit for destroying the cluster.
600

601
  """
602
  _OP_REQP = []
603

    
604
  def CheckPrereq(self):
605
    """Check prerequisites.
606

607
    This checks whether the cluster is empty.
608

609
    Any errors are signalled by raising errors.OpPrereqError.
610

611
    """
612
    master = self.sstore.GetMasterNode()
613

    
614
    nodelist = self.cfg.GetNodeList()
615
    if len(nodelist) != 1 or nodelist[0] != master:
616
      raise errors.OpPrereqError("There are still %d node(s) in"
617
                                 " this cluster." % (len(nodelist) - 1))
618
    instancelist = self.cfg.GetInstanceList()
619
    if instancelist:
620
      raise errors.OpPrereqError("There are still %d instance(s) in"
621
                                 " this cluster." % len(instancelist))
622

    
623
  def Exec(self, feedback_fn):
624
    """Destroys the cluster.
625

626
    """
627
    master = self.sstore.GetMasterNode()
628
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629
    utils.CreateBackup(priv_key)
630
    utils.CreateBackup(pub_key)
631
    rpc.call_node_leave_cluster(master)
632

    
633

    
634
class LUVerifyCluster(NoHooksLU):
635
  """Verifies the cluster status.
636

637
  """
638
  _OP_REQP = []
639

    
640
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641
                  remote_version, feedback_fn):
642
    """Run multiple tests against a node.
643

644
    Test list:
645
      - compares ganeti version
646
      - checks vg existance and size > 20G
647
      - checks config file checksum
648
      - checks ssh to other nodes
649

650
    Args:
651
      node: name of the node to check
652
      file_list: required list of files
653
      local_cksum: dictionary of local files and their checksums
654

655
    """
656
    # compares ganeti version
657
    local_version = constants.PROTOCOL_VERSION
658
    if not remote_version:
659
      feedback_fn(" - ERROR: connection to %s failed" % (node))
660
      return True
661

    
662
    if local_version != remote_version:
663
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664
                      (local_version, node, remote_version))
665
      return True
666

    
667
    # checks vg existance and size > 20G
668

    
669
    bad = False
670
    if not vglist:
671
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672
                      (node,))
673
      bad = True
674
    else:
675
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676
      if vgstatus:
677
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678
        bad = True
679

    
680
    # checks config file checksum
681
    # checks ssh to any
682

    
683
    if 'filelist' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
686
    else:
687
      remote_cksum = node_result['filelist']
688
      for file_name in file_list:
689
        if file_name not in remote_cksum:
690
          bad = True
691
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
692
        elif remote_cksum[file_name] != local_cksum[file_name]:
693
          bad = True
694
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695

    
696
    if 'nodelist' not in node_result:
697
      bad = True
698
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699
    else:
700
      if node_result['nodelist']:
701
        bad = True
702
        for node in node_result['nodelist']:
703
          feedback_fn("  - ERROR: communication with node '%s': %s" %
704
                          (node, node_result['nodelist'][node]))
705
    hyp_result = node_result.get('hypervisor', None)
706
    if hyp_result is not None:
707
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708
    return bad
709

    
710
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711
    """Verify an instance.
712

713
    This function checks to see if the required block devices are
714
    available on the instance's node.
715

716
    """
717
    bad = False
718

    
719
    instancelist = self.cfg.GetInstanceList()
720
    if not instance in instancelist:
721
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
722
                      (instance, instancelist))
723
      bad = True
724

    
725
    instanceconfig = self.cfg.GetInstanceInfo(instance)
726
    node_current = instanceconfig.primary_node
727

    
728
    node_vol_should = {}
729
    instanceconfig.MapLVsByNode(node_vol_should)
730

    
731
    for node in node_vol_should:
732
      for volume in node_vol_should[node]:
733
        if node not in node_vol_is or volume not in node_vol_is[node]:
734
          feedback_fn("  - ERROR: volume %s missing on node %s" %
735
                          (volume, node))
736
          bad = True
737

    
738
    if not instanceconfig.status == 'down':
739
      if not instance in node_instance[node_current]:
740
        feedback_fn("  - ERROR: instance %s not running on node %s" %
741
                        (instance, node_current))
742
        bad = True
743

    
744
    for node in node_instance:
745
      if (not node == node_current):
746
        if instance in node_instance[node]:
747
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
748
                          (instance, node))
749
          bad = True
750

    
751
    return bad
752

    
753
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754
    """Verify if there are any unknown volumes in the cluster.
755

756
    The .os, .swap and backup volumes are ignored. All other volumes are
757
    reported as unknown.
758

759
    """
760
    bad = False
761

    
762
    for node in node_vol_is:
763
      for volume in node_vol_is[node]:
764
        if node not in node_vol_should or volume not in node_vol_should[node]:
765
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766
                      (volume, node))
767
          bad = True
768
    return bad
769

    
770
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771
    """Verify the list of running instances.
772

773
    This checks what instances are running but unknown to the cluster.
774

775
    """
776
    bad = False
777
    for node in node_instance:
778
      for runninginstance in node_instance[node]:
779
        if runninginstance not in instancelist:
780
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781
                          (runninginstance, node))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    This has no prerequisites.
789

790
    """
791
    pass
792

    
793
  def Exec(self, feedback_fn):
794
    """Verify integrity of cluster, performing various test on nodes.
795

796
    """
797
    bad = False
798
    feedback_fn("* Verifying global settings")
799
    for msg in self.cfg.VerifyConfig():
800
      feedback_fn("  - ERROR: %s" % msg)
801

    
802
    vg_name = self.cfg.GetVGName()
803
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
804
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805
    node_volume = {}
806
    node_instance = {}
807

    
808
    # FIXME: verify OS list
809
    # do local checksums
810
    file_names = list(self.sstore.GetFileList())
811
    file_names.append(constants.SSL_CERT_FILE)
812
    file_names.append(constants.CLUSTER_CONF_FILE)
813
    local_checksums = utils.FingerprintFiles(file_names)
814

    
815
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817
    all_instanceinfo = rpc.call_instance_list(nodelist)
818
    all_vglist = rpc.call_vg_list(nodelist)
819
    node_verify_param = {
820
      'filelist': file_names,
821
      'nodelist': nodelist,
822
      'hypervisor': None,
823
      }
824
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825
    all_rversion = rpc.call_version(nodelist)
826

    
827
    for node in nodelist:
828
      feedback_fn("* Verifying node %s" % node)
829
      result = self._VerifyNode(node, file_names, local_checksums,
830
                                all_vglist[node], all_nvinfo[node],
831
                                all_rversion[node], feedback_fn)
832
      bad = bad or result
833

    
834
      # node_volume
835
      volumeinfo = all_volumeinfo[node]
836

    
837
      if type(volumeinfo) != dict:
838
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
839
        bad = True
840
        continue
841

    
842
      node_volume[node] = volumeinfo
843

    
844
      # node_instance
845
      nodeinstance = all_instanceinfo[node]
846
      if type(nodeinstance) != list:
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850

    
851
      node_instance[node] = nodeinstance
852

    
853
    node_vol_should = {}
854

    
855
    for instance in instancelist:
856
      feedback_fn("* Verifying instance %s" % instance)
857
      result =  self._VerifyInstance(instance, node_volume, node_instance,
858
                                     feedback_fn)
859
      bad = bad or result
860

    
861
      inst_config = self.cfg.GetInstanceInfo(instance)
862

    
863
      inst_config.MapLVsByNode(node_vol_should)
864

    
865
    feedback_fn("* Verifying orphan volumes")
866
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867
                                       feedback_fn)
868
    bad = bad or result
869

    
870
    feedback_fn("* Verifying remaining instances")
871
    result = self._VerifyOrphanInstances(instancelist, node_instance,
872
                                         feedback_fn)
873
    bad = bad or result
874

    
875
    return int(bad)
876

    
877

    
878
class LUVerifyDisks(NoHooksLU):
879
  """Verifies the cluster disks status.
880

881
  """
882
  _OP_REQP = []
883

    
884
  def CheckPrereq(self):
885
    """Check prerequisites.
886

887
    This has no prerequisites.
888

889
    """
890
    pass
891

    
892
  def Exec(self, feedback_fn):
893
    """Verify integrity of cluster disks.
894

895
    """
896
    result = res_nodes, res_instances = [], []
897

    
898
    vg_name = self.cfg.GetVGName()
899
    nodes = utils.NiceSort(self.cfg.GetNodeList())
900
    instances = [self.cfg.GetInstanceInfo(name)
901
                 for name in self.cfg.GetInstanceList()]
902

    
903
    nv_dict = {}
904
    for inst in instances:
905
      inst_lvs = {}
906
      if (inst.status != "up" or
907
          inst.disk_template not in constants.DTS_NET_MIRROR):
908
        continue
909
      inst.MapLVsByNode(inst_lvs)
910
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
911
      for node, vol_list in inst_lvs.iteritems():
912
        for vol in vol_list:
913
          nv_dict[(node, vol)] = inst
914

    
915
    if not nv_dict:
916
      return result
917

    
918
    node_lvs = rpc.call_volume_list(nodes, vg_name)
919

    
920
    to_act = set()
921
    for node in nodes:
922
      # node_volume
923
      lvs = node_lvs[node]
924

    
925
      if not isinstance(lvs, dict):
926
        logger.Info("connection to node %s failed or invalid data returned" %
927
                    (node,))
928
        res_nodes.append(node)
929
        continue
930

    
931
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
932
        if not lv_online:
933
          inst = nv_dict.get((node, lv_name), None)
934
          if inst is not None and inst.name not in res_instances:
935
            res_instances.append(inst.name)
936

    
937
    return result
938

    
939

    
940
class LURenameCluster(LogicalUnit):
941
  """Rename the cluster.
942

943
  """
944
  HPATH = "cluster-rename"
945
  HTYPE = constants.HTYPE_CLUSTER
946
  _OP_REQP = ["name"]
947

    
948
  def BuildHooksEnv(self):
949
    """Build hooks env.
950

951
    """
952
    env = {
953
      "OP_TARGET": self.op.sstore.GetClusterName(),
954
      "NEW_NAME": self.op.name,
955
      }
956
    mn = self.sstore.GetMasterNode()
957
    return env, [mn], [mn]
958

    
959
  def CheckPrereq(self):
960
    """Verify that the passed name is a valid one.
961

962
    """
963
    hostname = utils.HostInfo(self.op.name)
964

    
965
    new_name = hostname.name
966
    self.ip = new_ip = hostname.ip
967
    old_name = self.sstore.GetClusterName()
968
    old_ip = self.sstore.GetMasterIP()
969
    if new_name == old_name and new_ip == old_ip:
970
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
971
                                 " cluster has changed")
972
    if new_ip != old_ip:
973
      result = utils.RunCmd(["fping", "-q", new_ip])
974
      if not result.failed:
975
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
976
                                   " reachable on the network. Aborting." %
977
                                   new_ip)
978

    
979
    self.op.name = new_name
980

    
981
  def Exec(self, feedback_fn):
982
    """Rename the cluster.
983

984
    """
985
    clustername = self.op.name
986
    ip = self.ip
987
    ss = self.sstore
988

    
989
    # shutdown the master IP
990
    master = ss.GetMasterNode()
991
    if not rpc.call_node_stop_master(master):
992
      raise errors.OpExecError("Could not disable the master role")
993

    
994
    try:
995
      # modify the sstore
996
      ss.SetKey(ss.SS_MASTER_IP, ip)
997
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998

    
999
      # Distribute updated ss config to all nodes
1000
      myself = self.cfg.GetNodeInfo(master)
1001
      dist_nodes = self.cfg.GetNodeList()
1002
      if myself.name in dist_nodes:
1003
        dist_nodes.remove(myself.name)
1004

    
1005
      logger.Debug("Copying updated ssconf data to all nodes")
1006
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007
        fname = ss.KeyToFilename(keyname)
1008
        result = rpc.call_upload_file(dist_nodes, fname)
1009
        for to_node in dist_nodes:
1010
          if not result[to_node]:
1011
            logger.Error("copy of file %s to node %s failed" %
1012
                         (fname, to_node))
1013
    finally:
1014
      if not rpc.call_node_start_master(master):
1015
        logger.Error("Could not re-enable the master role on the master,"
1016
                     " please restart manually.")
1017

    
1018

    
1019
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1020
  """Sleep and poll for an instance's disk to sync.
1021

1022
  """
1023
  if not instance.disks:
1024
    return True
1025

    
1026
  if not oneshot:
1027
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1028

    
1029
  node = instance.primary_node
1030

    
1031
  for dev in instance.disks:
1032
    cfgw.SetDiskID(dev, node)
1033

    
1034
  retries = 0
1035
  while True:
1036
    max_time = 0
1037
    done = True
1038
    cumul_degraded = False
1039
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040
    if not rstats:
1041
      proc.LogWarning("Can't get any data from node %s" % node)
1042
      retries += 1
1043
      if retries >= 10:
1044
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1045
                                 " aborting." % node)
1046
      time.sleep(6)
1047
      continue
1048
    retries = 0
1049
    for i in range(len(rstats)):
1050
      mstat = rstats[i]
1051
      if mstat is None:
1052
        proc.LogWarning("Can't compute data for node %s/%s" %
1053
                        (node, instance.disks[i].iv_name))
1054
        continue
1055
      # we ignore the ldisk parameter
1056
      perc_done, est_time, is_degraded, _ = mstat
1057
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1058
      if perc_done is not None:
1059
        done = False
1060
        if est_time is not None:
1061
          rem_time = "%d estimated seconds remaining" % est_time
1062
          max_time = est_time
1063
        else:
1064
          rem_time = "no time estimate"
1065
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1066
                     (instance.disks[i].iv_name, perc_done, rem_time))
1067
    if done or oneshot:
1068
      break
1069

    
1070
    if unlock:
1071
      utils.Unlock('cmd')
1072
    try:
1073
      time.sleep(min(60, max_time))
1074
    finally:
1075
      if unlock:
1076
        utils.Lock('cmd')
1077

    
1078
  if done:
1079
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1080
  return not cumul_degraded
1081

    
1082

    
1083
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1084
  """Check that mirrors are not degraded.
1085

1086
  The ldisk parameter, if True, will change the test from the
1087
  is_degraded attribute (which represents overall non-ok status for
1088
  the device(s)) to the ldisk (representing the local storage status).
1089

1090
  """
1091
  cfgw.SetDiskID(dev, node)
1092
  if ldisk:
1093
    idx = 6
1094
  else:
1095
    idx = 5
1096

    
1097
  result = True
1098
  if on_primary or dev.AssembleOnSecondary():
1099
    rstats = rpc.call_blockdev_find(node, dev)
1100
    if not rstats:
1101
      logger.ToStderr("Can't get any data from node %s" % node)
1102
      result = False
1103
    else:
1104
      result = result and (not rstats[idx])
1105
  if dev.children:
1106
    for child in dev.children:
1107
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1108

    
1109
  return result
1110

    
1111

    
1112
class LUDiagnoseOS(NoHooksLU):
1113
  """Logical unit for OS diagnose/query.
1114

1115
  """
1116
  _OP_REQP = []
1117

    
1118
  def CheckPrereq(self):
1119
    """Check prerequisites.
1120

1121
    This always succeeds, since this is a pure query LU.
1122

1123
    """
1124
    return
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Compute the list of OSes.
1128

1129
    """
1130
    node_list = self.cfg.GetNodeList()
1131
    node_data = rpc.call_os_diagnose(node_list)
1132
    if node_data == False:
1133
      raise errors.OpExecError("Can't gather the list of OSes")
1134
    return node_data
1135

    
1136

    
1137
class LURemoveNode(LogicalUnit):
1138
  """Logical unit for removing a node.
1139

1140
  """
1141
  HPATH = "node-remove"
1142
  HTYPE = constants.HTYPE_NODE
1143
  _OP_REQP = ["node_name"]
1144

    
1145
  def BuildHooksEnv(self):
1146
    """Build hooks env.
1147

1148
    This doesn't run on the target node in the pre phase as a failed
1149
    node would not allows itself to run.
1150

1151
    """
1152
    env = {
1153
      "OP_TARGET": self.op.node_name,
1154
      "NODE_NAME": self.op.node_name,
1155
      }
1156
    all_nodes = self.cfg.GetNodeList()
1157
    all_nodes.remove(self.op.node_name)
1158
    return env, all_nodes, all_nodes
1159

    
1160
  def CheckPrereq(self):
1161
    """Check prerequisites.
1162

1163
    This checks:
1164
     - the node exists in the configuration
1165
     - it does not have primary or secondary instances
1166
     - it's not the master
1167

1168
    Any errors are signalled by raising errors.OpPrereqError.
1169

1170
    """
1171
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1172
    if node is None:
1173
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1174

    
1175
    instance_list = self.cfg.GetInstanceList()
1176

    
1177
    masternode = self.sstore.GetMasterNode()
1178
    if node.name == masternode:
1179
      raise errors.OpPrereqError("Node is the master node,"
1180
                                 " you need to failover first.")
1181

    
1182
    for instance_name in instance_list:
1183
      instance = self.cfg.GetInstanceInfo(instance_name)
1184
      if node.name == instance.primary_node:
1185
        raise errors.OpPrereqError("Instance %s still running on the node,"
1186
                                   " please remove first." % instance_name)
1187
      if node.name in instance.secondary_nodes:
1188
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1189
                                   " please remove first." % instance_name)
1190
    self.op.node_name = node.name
1191
    self.node = node
1192

    
1193
  def Exec(self, feedback_fn):
1194
    """Removes the node from the cluster.
1195

1196
    """
1197
    node = self.node
1198
    logger.Info("stopping the node daemon and removing configs from node %s" %
1199
                node.name)
1200

    
1201
    rpc.call_node_leave_cluster(node.name)
1202

    
1203
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1204

    
1205
    logger.Info("Removing node %s from config" % node.name)
1206

    
1207
    self.cfg.RemoveNode(node.name)
1208

    
1209
    _RemoveHostFromEtcHosts(node.name)
1210

    
1211

    
1212
class LUQueryNodes(NoHooksLU):
1213
  """Logical unit for querying nodes.
1214

1215
  """
1216
  _OP_REQP = ["output_fields", "names"]
1217

    
1218
  def CheckPrereq(self):
1219
    """Check prerequisites.
1220

1221
    This checks that the fields required are valid output fields.
1222

1223
    """
1224
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1225
                                     "mtotal", "mnode", "mfree",
1226
                                     "bootid"])
1227

    
1228
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1229
                               "pinst_list", "sinst_list",
1230
                               "pip", "sip"],
1231
                       dynamic=self.dynamic_fields,
1232
                       selected=self.op.output_fields)
1233

    
1234
    self.wanted = _GetWantedNodes(self, self.op.names)
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Computes the list of nodes and their attributes.
1238

1239
    """
1240
    nodenames = self.wanted
1241
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1242

    
1243
    # begin data gathering
1244

    
1245
    if self.dynamic_fields.intersection(self.op.output_fields):
1246
      live_data = {}
1247
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1248
      for name in nodenames:
1249
        nodeinfo = node_data.get(name, None)
1250
        if nodeinfo:
1251
          live_data[name] = {
1252
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1253
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1254
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1255
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1256
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1257
            "bootid": nodeinfo['bootid'],
1258
            }
1259
        else:
1260
          live_data[name] = {}
1261
    else:
1262
      live_data = dict.fromkeys(nodenames, {})
1263

    
1264
    node_to_primary = dict([(name, set()) for name in nodenames])
1265
    node_to_secondary = dict([(name, set()) for name in nodenames])
1266

    
1267
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1268
                             "sinst_cnt", "sinst_list"))
1269
    if inst_fields & frozenset(self.op.output_fields):
1270
      instancelist = self.cfg.GetInstanceList()
1271

    
1272
      for instance_name in instancelist:
1273
        inst = self.cfg.GetInstanceInfo(instance_name)
1274
        if inst.primary_node in node_to_primary:
1275
          node_to_primary[inst.primary_node].add(inst.name)
1276
        for secnode in inst.secondary_nodes:
1277
          if secnode in node_to_secondary:
1278
            node_to_secondary[secnode].add(inst.name)
1279

    
1280
    # end data gathering
1281

    
1282
    output = []
1283
    for node in nodelist:
1284
      node_output = []
1285
      for field in self.op.output_fields:
1286
        if field == "name":
1287
          val = node.name
1288
        elif field == "pinst_list":
1289
          val = list(node_to_primary[node.name])
1290
        elif field == "sinst_list":
1291
          val = list(node_to_secondary[node.name])
1292
        elif field == "pinst_cnt":
1293
          val = len(node_to_primary[node.name])
1294
        elif field == "sinst_cnt":
1295
          val = len(node_to_secondary[node.name])
1296
        elif field == "pip":
1297
          val = node.primary_ip
1298
        elif field == "sip":
1299
          val = node.secondary_ip
1300
        elif field in self.dynamic_fields:
1301
          val = live_data[node.name].get(field, None)
1302
        else:
1303
          raise errors.ParameterError(field)
1304
        node_output.append(val)
1305
      output.append(node_output)
1306

    
1307
    return output
1308

    
1309

    
1310
class LUQueryNodeVolumes(NoHooksLU):
1311
  """Logical unit for getting volumes on node(s).
1312

1313
  """
1314
  _OP_REQP = ["nodes", "output_fields"]
1315

    
1316
  def CheckPrereq(self):
1317
    """Check prerequisites.
1318

1319
    This checks that the fields required are valid output fields.
1320

1321
    """
1322
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1323

    
1324
    _CheckOutputFields(static=["node"],
1325
                       dynamic=["phys", "vg", "name", "size", "instance"],
1326
                       selected=self.op.output_fields)
1327

    
1328

    
1329
  def Exec(self, feedback_fn):
1330
    """Computes the list of nodes and their attributes.
1331

1332
    """
1333
    nodenames = self.nodes
1334
    volumes = rpc.call_node_volumes(nodenames)
1335

    
1336
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1337
             in self.cfg.GetInstanceList()]
1338

    
1339
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1340

    
1341
    output = []
1342
    for node in nodenames:
1343
      if node not in volumes or not volumes[node]:
1344
        continue
1345

    
1346
      node_vols = volumes[node][:]
1347
      node_vols.sort(key=lambda vol: vol['dev'])
1348

    
1349
      for vol in node_vols:
1350
        node_output = []
1351
        for field in self.op.output_fields:
1352
          if field == "node":
1353
            val = node
1354
          elif field == "phys":
1355
            val = vol['dev']
1356
          elif field == "vg":
1357
            val = vol['vg']
1358
          elif field == "name":
1359
            val = vol['name']
1360
          elif field == "size":
1361
            val = int(float(vol['size']))
1362
          elif field == "instance":
1363
            for inst in ilist:
1364
              if node not in lv_by_node[inst]:
1365
                continue
1366
              if vol['name'] in lv_by_node[inst][node]:
1367
                val = inst.name
1368
                break
1369
            else:
1370
              val = '-'
1371
          else:
1372
            raise errors.ParameterError(field)
1373
          node_output.append(str(val))
1374

    
1375
        output.append(node_output)
1376

    
1377
    return output
1378

    
1379

    
1380
class LUAddNode(LogicalUnit):
1381
  """Logical unit for adding node to the cluster.
1382

1383
  """
1384
  HPATH = "node-add"
1385
  HTYPE = constants.HTYPE_NODE
1386
  _OP_REQP = ["node_name"]
1387

    
1388
  def BuildHooksEnv(self):
1389
    """Build hooks env.
1390

1391
    This will run on all nodes before, and on all nodes + the new node after.
1392

1393
    """
1394
    env = {
1395
      "OP_TARGET": self.op.node_name,
1396
      "NODE_NAME": self.op.node_name,
1397
      "NODE_PIP": self.op.primary_ip,
1398
      "NODE_SIP": self.op.secondary_ip,
1399
      }
1400
    nodes_0 = self.cfg.GetNodeList()
1401
    nodes_1 = nodes_0 + [self.op.node_name, ]
1402
    return env, nodes_0, nodes_1
1403

    
1404
  def CheckPrereq(self):
1405
    """Check prerequisites.
1406

1407
    This checks:
1408
     - the new node is not already in the config
1409
     - it is resolvable
1410
     - its parameters (single/dual homed) matches the cluster
1411

1412
    Any errors are signalled by raising errors.OpPrereqError.
1413

1414
    """
1415
    node_name = self.op.node_name
1416
    cfg = self.cfg
1417

    
1418
    dns_data = utils.HostInfo(node_name)
1419

    
1420
    node = dns_data.name
1421
    primary_ip = self.op.primary_ip = dns_data.ip
1422
    secondary_ip = getattr(self.op, "secondary_ip", None)
1423
    if secondary_ip is None:
1424
      secondary_ip = primary_ip
1425
    if not utils.IsValidIP(secondary_ip):
1426
      raise errors.OpPrereqError("Invalid secondary IP given")
1427
    self.op.secondary_ip = secondary_ip
1428
    node_list = cfg.GetNodeList()
1429
    if node in node_list:
1430
      raise errors.OpPrereqError("Node %s is already in the configuration"
1431
                                 % node)
1432

    
1433
    for existing_node_name in node_list:
1434
      existing_node = cfg.GetNodeInfo(existing_node_name)
1435
      if (existing_node.primary_ip == primary_ip or
1436
          existing_node.secondary_ip == primary_ip or
1437
          existing_node.primary_ip == secondary_ip or
1438
          existing_node.secondary_ip == secondary_ip):
1439
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1440
                                   " existing node %s" % existing_node.name)
1441

    
1442
    # check that the type of the node (single versus dual homed) is the
1443
    # same as for the master
1444
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1445
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1446
    newbie_singlehomed = secondary_ip == primary_ip
1447
    if master_singlehomed != newbie_singlehomed:
1448
      if master_singlehomed:
1449
        raise errors.OpPrereqError("The master has no private ip but the"
1450
                                   " new node has one")
1451
      else:
1452
        raise errors.OpPrereqError("The master has a private ip but the"
1453
                                   " new node doesn't have one")
1454

    
1455
    # checks reachablity
1456
    if not utils.TcpPing(utils.HostInfo().name,
1457
                         primary_ip,
1458
                         constants.DEFAULT_NODED_PORT):
1459
      raise errors.OpPrereqError("Node not reachable by ping")
1460

    
1461
    if not newbie_singlehomed:
1462
      # check reachability from my secondary ip to newbie's secondary ip
1463
      if not utils.TcpPing(myself.secondary_ip,
1464
                           secondary_ip,
1465
                           constants.DEFAULT_NODED_PORT):
1466
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1467
                                   " based ping to noded port")
1468

    
1469
    self.new_node = objects.Node(name=node,
1470
                                 primary_ip=primary_ip,
1471
                                 secondary_ip=secondary_ip)
1472

    
1473
  def Exec(self, feedback_fn):
1474
    """Adds the new node to the cluster.
1475

1476
    """
1477
    new_node = self.new_node
1478
    node = new_node.name
1479

    
1480
    # set up inter-node password and certificate and restarts the node daemon
1481
    gntpass = self.sstore.GetNodeDaemonPassword()
1482
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1483
      raise errors.OpExecError("ganeti password corruption detected")
1484
    f = open(constants.SSL_CERT_FILE)
1485
    try:
1486
      gntpem = f.read(8192)
1487
    finally:
1488
      f.close()
1489
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1490
    # so we use this to detect an invalid certificate; as long as the
1491
    # cert doesn't contain this, the here-document will be correctly
1492
    # parsed by the shell sequence below
1493
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1494
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1495
    if not gntpem.endswith("\n"):
1496
      raise errors.OpExecError("PEM must end with newline")
1497
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1498

    
1499
    # and then connect with ssh to set password and start ganeti-noded
1500
    # note that all the below variables are sanitized at this point,
1501
    # either by being constants or by the checks above
1502
    ss = self.sstore
1503
    mycommand = ("umask 077 && "
1504
                 "echo '%s' > '%s' && "
1505
                 "cat > '%s' << '!EOF.' && \n"
1506
                 "%s!EOF.\n%s restart" %
1507
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1508
                  constants.SSL_CERT_FILE, gntpem,
1509
                  constants.NODE_INITD_SCRIPT))
1510

    
1511
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1512
    if result.failed:
1513
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1514
                               " output: %s" %
1515
                               (node, result.fail_reason, result.output))
1516

    
1517
    # check connectivity
1518
    time.sleep(4)
1519

    
1520
    result = rpc.call_version([node])[node]
1521
    if result:
1522
      if constants.PROTOCOL_VERSION == result:
1523
        logger.Info("communication to node %s fine, sw version %s match" %
1524
                    (node, result))
1525
      else:
1526
        raise errors.OpExecError("Version mismatch master version %s,"
1527
                                 " node version %s" %
1528
                                 (constants.PROTOCOL_VERSION, result))
1529
    else:
1530
      raise errors.OpExecError("Cannot get version from the new node")
1531

    
1532
    # setup ssh on node
1533
    logger.Info("copy ssh key to node %s" % node)
1534
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1535
    keyarray = []
1536
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1537
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1538
                priv_key, pub_key]
1539

    
1540
    for i in keyfiles:
1541
      f = open(i, 'r')
1542
      try:
1543
        keyarray.append(f.read())
1544
      finally:
1545
        f.close()
1546

    
1547
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1548
                               keyarray[3], keyarray[4], keyarray[5])
1549

    
1550
    if not result:
1551
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1552

    
1553
    # Add node to our /etc/hosts, and add key to known_hosts
1554
    _AddHostToEtcHosts(new_node.name)
1555

    
1556
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1557
                      self.cfg.GetHostKey())
1558

    
1559
    if new_node.secondary_ip != new_node.primary_ip:
1560
      if not rpc.call_node_tcp_ping(new_node.name,
1561
                                    constants.LOCALHOST_IP_ADDRESS,
1562
                                    new_node.secondary_ip,
1563
                                    constants.DEFAULT_NODED_PORT,
1564
                                    10, False):
1565
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1566
                                 " you gave (%s). Please fix and re-run this"
1567
                                 " command." % new_node.secondary_ip)
1568

    
1569
    success, msg = ssh.VerifyNodeHostname(node)
1570
    if not success:
1571
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1572
                               " than the one the resolver gives: %s."
1573
                               " Please fix and re-run this command." %
1574
                               (node, msg))
1575

    
1576
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1577
    # including the node just added
1578
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1579
    dist_nodes = self.cfg.GetNodeList() + [node]
1580
    if myself.name in dist_nodes:
1581
      dist_nodes.remove(myself.name)
1582

    
1583
    logger.Debug("Copying hosts and known_hosts to all nodes")
1584
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1585
      result = rpc.call_upload_file(dist_nodes, fname)
1586
      for to_node in dist_nodes:
1587
        if not result[to_node]:
1588
          logger.Error("copy of file %s to node %s failed" %
1589
                       (fname, to_node))
1590

    
1591
    to_copy = ss.GetFileList()
1592
    for fname in to_copy:
1593
      if not ssh.CopyFileToNode(node, fname):
1594
        logger.Error("could not copy file %s to node %s" % (fname, node))
1595

    
1596
    logger.Info("adding node %s to cluster.conf" % node)
1597
    self.cfg.AddNode(new_node)
1598

    
1599

    
1600
class LUMasterFailover(LogicalUnit):
1601
  """Failover the master node to the current node.
1602

1603
  This is a special LU in that it must run on a non-master node.
1604

1605
  """
1606
  HPATH = "master-failover"
1607
  HTYPE = constants.HTYPE_CLUSTER
1608
  REQ_MASTER = False
1609
  _OP_REQP = []
1610

    
1611
  def BuildHooksEnv(self):
1612
    """Build hooks env.
1613

1614
    This will run on the new master only in the pre phase, and on all
1615
    the nodes in the post phase.
1616

1617
    """
1618
    env = {
1619
      "OP_TARGET": self.new_master,
1620
      "NEW_MASTER": self.new_master,
1621
      "OLD_MASTER": self.old_master,
1622
      }
1623
    return env, [self.new_master], self.cfg.GetNodeList()
1624

    
1625
  def CheckPrereq(self):
1626
    """Check prerequisites.
1627

1628
    This checks that we are not already the master.
1629

1630
    """
1631
    self.new_master = utils.HostInfo().name
1632
    self.old_master = self.sstore.GetMasterNode()
1633

    
1634
    if self.old_master == self.new_master:
1635
      raise errors.OpPrereqError("This commands must be run on the node"
1636
                                 " where you want the new master to be."
1637
                                 " %s is already the master" %
1638
                                 self.old_master)
1639

    
1640
  def Exec(self, feedback_fn):
1641
    """Failover the master node.
1642

1643
    This command, when run on a non-master node, will cause the current
1644
    master to cease being master, and the non-master to become new
1645
    master.
1646

1647
    """
1648
    #TODO: do not rely on gethostname returning the FQDN
1649
    logger.Info("setting master to %s, old master: %s" %
1650
                (self.new_master, self.old_master))
1651

    
1652
    if not rpc.call_node_stop_master(self.old_master):
1653
      logger.Error("could disable the master role on the old master"
1654
                   " %s, please disable manually" % self.old_master)
1655

    
1656
    ss = self.sstore
1657
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1658
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1659
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1660
      logger.Error("could not distribute the new simple store master file"
1661
                   " to the other nodes, please check.")
1662

    
1663
    if not rpc.call_node_start_master(self.new_master):
1664
      logger.Error("could not start the master role on the new master"
1665
                   " %s, please check" % self.new_master)
1666
      feedback_fn("Error in activating the master IP on the new master,"
1667
                  " please fix manually.")
1668

    
1669

    
1670

    
1671
class LUQueryClusterInfo(NoHooksLU):
1672
  """Query cluster configuration.
1673

1674
  """
1675
  _OP_REQP = []
1676
  REQ_MASTER = False
1677

    
1678
  def CheckPrereq(self):
1679
    """No prerequsites needed for this LU.
1680

1681
    """
1682
    pass
1683

    
1684
  def Exec(self, feedback_fn):
1685
    """Return cluster config.
1686

1687
    """
1688
    result = {
1689
      "name": self.sstore.GetClusterName(),
1690
      "software_version": constants.RELEASE_VERSION,
1691
      "protocol_version": constants.PROTOCOL_VERSION,
1692
      "config_version": constants.CONFIG_VERSION,
1693
      "os_api_version": constants.OS_API_VERSION,
1694
      "export_version": constants.EXPORT_VERSION,
1695
      "master": self.sstore.GetMasterNode(),
1696
      "architecture": (platform.architecture()[0], platform.machine()),
1697
      }
1698

    
1699
    return result
1700

    
1701

    
1702
class LUClusterCopyFile(NoHooksLU):
1703
  """Copy file to cluster.
1704

1705
  """
1706
  _OP_REQP = ["nodes", "filename"]
1707

    
1708
  def CheckPrereq(self):
1709
    """Check prerequisites.
1710

1711
    It should check that the named file exists and that the given list
1712
    of nodes is valid.
1713

1714
    """
1715
    if not os.path.exists(self.op.filename):
1716
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1717

    
1718
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1719

    
1720
  def Exec(self, feedback_fn):
1721
    """Copy a file from master to some nodes.
1722

1723
    Args:
1724
      opts - class with options as members
1725
      args - list containing a single element, the file name
1726
    Opts used:
1727
      nodes - list containing the name of target nodes; if empty, all nodes
1728

1729
    """
1730
    filename = self.op.filename
1731

    
1732
    myname = utils.HostInfo().name
1733

    
1734
    for node in self.nodes:
1735
      if node == myname:
1736
        continue
1737
      if not ssh.CopyFileToNode(node, filename):
1738
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1739

    
1740

    
1741
class LUDumpClusterConfig(NoHooksLU):
1742
  """Return a text-representation of the cluster-config.
1743

1744
  """
1745
  _OP_REQP = []
1746

    
1747
  def CheckPrereq(self):
1748
    """No prerequisites.
1749

1750
    """
1751
    pass
1752

    
1753
  def Exec(self, feedback_fn):
1754
    """Dump a representation of the cluster config to the standard output.
1755

1756
    """
1757
    return self.cfg.DumpConfig()
1758

    
1759

    
1760
class LURunClusterCommand(NoHooksLU):
1761
  """Run a command on some nodes.
1762

1763
  """
1764
  _OP_REQP = ["command", "nodes"]
1765

    
1766
  def CheckPrereq(self):
1767
    """Check prerequisites.
1768

1769
    It checks that the given list of nodes is valid.
1770

1771
    """
1772
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1773

    
1774
  def Exec(self, feedback_fn):
1775
    """Run a command on some nodes.
1776

1777
    """
1778
    data = []
1779
    for node in self.nodes:
1780
      result = ssh.SSHCall(node, "root", self.op.command)
1781
      data.append((node, result.output, result.exit_code))
1782

    
1783
    return data
1784

    
1785

    
1786
class LUActivateInstanceDisks(NoHooksLU):
1787
  """Bring up an instance's disks.
1788

1789
  """
1790
  _OP_REQP = ["instance_name"]
1791

    
1792
  def CheckPrereq(self):
1793
    """Check prerequisites.
1794

1795
    This checks that the instance is in the cluster.
1796

1797
    """
1798
    instance = self.cfg.GetInstanceInfo(
1799
      self.cfg.ExpandInstanceName(self.op.instance_name))
1800
    if instance is None:
1801
      raise errors.OpPrereqError("Instance '%s' not known" %
1802
                                 self.op.instance_name)
1803
    self.instance = instance
1804

    
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Activate the disks.
1808

1809
    """
1810
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1811
    if not disks_ok:
1812
      raise errors.OpExecError("Cannot activate block devices")
1813

    
1814
    return disks_info
1815

    
1816

    
1817
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1818
  """Prepare the block devices for an instance.
1819

1820
  This sets up the block devices on all nodes.
1821

1822
  Args:
1823
    instance: a ganeti.objects.Instance object
1824
    ignore_secondaries: if true, errors on secondary nodes won't result
1825
                        in an error return from the function
1826

1827
  Returns:
1828
    false if the operation failed
1829
    list of (host, instance_visible_name, node_visible_name) if the operation
1830
         suceeded with the mapping from node devices to instance devices
1831
  """
1832
  device_info = []
1833
  disks_ok = True
1834
  for inst_disk in instance.disks:
1835
    master_result = None
1836
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1837
      cfg.SetDiskID(node_disk, node)
1838
      is_primary = node == instance.primary_node
1839
      result = rpc.call_blockdev_assemble(node, node_disk,
1840
                                          instance.name, is_primary)
1841
      if not result:
1842
        logger.Error("could not prepare block device %s on node %s"
1843
                     " (is_primary=%s)" %
1844
                     (inst_disk.iv_name, node, is_primary))
1845
        if is_primary or not ignore_secondaries:
1846
          disks_ok = False
1847
      if is_primary:
1848
        master_result = result
1849
    device_info.append((instance.primary_node, inst_disk.iv_name,
1850
                        master_result))
1851

    
1852
  # leave the disks configured for the primary node
1853
  # this is a workaround that would be fixed better by
1854
  # improving the logical/physical id handling
1855
  for disk in instance.disks:
1856
    cfg.SetDiskID(disk, instance.primary_node)
1857

    
1858
  return disks_ok, device_info
1859

    
1860

    
1861
def _StartInstanceDisks(cfg, instance, force):
1862
  """Start the disks of an instance.
1863

1864
  """
1865
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1866
                                           ignore_secondaries=force)
1867
  if not disks_ok:
1868
    _ShutdownInstanceDisks(instance, cfg)
1869
    if force is not None and not force:
1870
      logger.Error("If the message above refers to a secondary node,"
1871
                   " you can retry the operation using '--force'.")
1872
    raise errors.OpExecError("Disk consistency error")
1873

    
1874

    
1875
class LUDeactivateInstanceDisks(NoHooksLU):
1876
  """Shutdown an instance's disks.
1877

1878
  """
1879
  _OP_REQP = ["instance_name"]
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    instance = self.cfg.GetInstanceInfo(
1888
      self.cfg.ExpandInstanceName(self.op.instance_name))
1889
    if instance is None:
1890
      raise errors.OpPrereqError("Instance '%s' not known" %
1891
                                 self.op.instance_name)
1892
    self.instance = instance
1893

    
1894
  def Exec(self, feedback_fn):
1895
    """Deactivate the disks
1896

1897
    """
1898
    instance = self.instance
1899
    ins_l = rpc.call_instance_list([instance.primary_node])
1900
    ins_l = ins_l[instance.primary_node]
1901
    if not type(ins_l) is list:
1902
      raise errors.OpExecError("Can't contact node '%s'" %
1903
                               instance.primary_node)
1904

    
1905
    if self.instance.name in ins_l:
1906
      raise errors.OpExecError("Instance is running, can't shutdown"
1907
                               " block devices.")
1908

    
1909
    _ShutdownInstanceDisks(instance, self.cfg)
1910

    
1911

    
1912
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1913
  """Shutdown block devices of an instance.
1914

1915
  This does the shutdown on all nodes of the instance.
1916

1917
  If the ignore_primary is false, errors on the primary node are
1918
  ignored.
1919

1920
  """
1921
  result = True
1922
  for disk in instance.disks:
1923
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1924
      cfg.SetDiskID(top_disk, node)
1925
      if not rpc.call_blockdev_shutdown(node, top_disk):
1926
        logger.Error("could not shutdown block device %s on node %s" %
1927
                     (disk.iv_name, node))
1928
        if not ignore_primary or node != instance.primary_node:
1929
          result = False
1930
  return result
1931

    
1932

    
1933
class LUStartupInstance(LogicalUnit):
1934
  """Starts an instance.
1935

1936
  """
1937
  HPATH = "instance-start"
1938
  HTYPE = constants.HTYPE_INSTANCE
1939
  _OP_REQP = ["instance_name", "force"]
1940

    
1941
  def BuildHooksEnv(self):
1942
    """Build hooks env.
1943

1944
    This runs on master, primary and secondary nodes of the instance.
1945

1946
    """
1947
    env = {
1948
      "FORCE": self.op.force,
1949
      }
1950
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1951
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1952
          list(self.instance.secondary_nodes))
1953
    return env, nl, nl
1954

    
1955
  def CheckPrereq(self):
1956
    """Check prerequisites.
1957

1958
    This checks that the instance is in the cluster.
1959

1960
    """
1961
    instance = self.cfg.GetInstanceInfo(
1962
      self.cfg.ExpandInstanceName(self.op.instance_name))
1963
    if instance is None:
1964
      raise errors.OpPrereqError("Instance '%s' not known" %
1965
                                 self.op.instance_name)
1966

    
1967
    # check bridges existance
1968
    _CheckInstanceBridgesExist(instance)
1969

    
1970
    self.instance = instance
1971
    self.op.instance_name = instance.name
1972

    
1973
  def Exec(self, feedback_fn):
1974
    """Start the instance.
1975

1976
    """
1977
    instance = self.instance
1978
    force = self.op.force
1979
    extra_args = getattr(self.op, "extra_args", "")
1980

    
1981
    node_current = instance.primary_node
1982

    
1983
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1984
    if not nodeinfo:
1985
      raise errors.OpExecError("Could not contact node %s for infos" %
1986
                               (node_current))
1987

    
1988
    freememory = nodeinfo[node_current]['memory_free']
1989
    memory = instance.memory
1990
    if memory > freememory:
1991
      raise errors.OpExecError("Not enough memory to start instance"
1992
                               " %s on node %s"
1993
                               " needed %s MiB, available %s MiB" %
1994
                               (instance.name, node_current, memory,
1995
                                freememory))
1996

    
1997
    _StartInstanceDisks(self.cfg, instance, force)
1998

    
1999
    if not rpc.call_instance_start(node_current, instance, extra_args):
2000
      _ShutdownInstanceDisks(instance, self.cfg)
2001
      raise errors.OpExecError("Could not start instance")
2002

    
2003
    self.cfg.MarkInstanceUp(instance.name)
2004

    
2005

    
2006
class LURebootInstance(LogicalUnit):
2007
  """Reboot an instance.
2008

2009
  """
2010
  HPATH = "instance-reboot"
2011
  HTYPE = constants.HTYPE_INSTANCE
2012
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2013

    
2014
  def BuildHooksEnv(self):
2015
    """Build hooks env.
2016

2017
    This runs on master, primary and secondary nodes of the instance.
2018

2019
    """
2020
    env = {
2021
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2022
      }
2023
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2024
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2025
          list(self.instance.secondary_nodes))
2026
    return env, nl, nl
2027

    
2028
  def CheckPrereq(self):
2029
    """Check prerequisites.
2030

2031
    This checks that the instance is in the cluster.
2032

2033
    """
2034
    instance = self.cfg.GetInstanceInfo(
2035
      self.cfg.ExpandInstanceName(self.op.instance_name))
2036
    if instance is None:
2037
      raise errors.OpPrereqError("Instance '%s' not known" %
2038
                                 self.op.instance_name)
2039

    
2040
    # check bridges existance
2041
    _CheckInstanceBridgesExist(instance)
2042

    
2043
    self.instance = instance
2044
    self.op.instance_name = instance.name
2045

    
2046
  def Exec(self, feedback_fn):
2047
    """Reboot the instance.
2048

2049
    """
2050
    instance = self.instance
2051
    ignore_secondaries = self.op.ignore_secondaries
2052
    reboot_type = self.op.reboot_type
2053
    extra_args = getattr(self.op, "extra_args", "")
2054

    
2055
    node_current = instance.primary_node
2056

    
2057
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2058
                           constants.INSTANCE_REBOOT_HARD,
2059
                           constants.INSTANCE_REBOOT_FULL]:
2060
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2061
                                  (constants.INSTANCE_REBOOT_SOFT,
2062
                                   constants.INSTANCE_REBOOT_HARD,
2063
                                   constants.INSTANCE_REBOOT_FULL))
2064

    
2065
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2066
                       constants.INSTANCE_REBOOT_HARD]:
2067
      if not rpc.call_instance_reboot(node_current, instance,
2068
                                      reboot_type, extra_args):
2069
        raise errors.OpExecError("Could not reboot instance")
2070
    else:
2071
      if not rpc.call_instance_shutdown(node_current, instance):
2072
        raise errors.OpExecError("could not shutdown instance for full reboot")
2073
      _ShutdownInstanceDisks(instance, self.cfg)
2074
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2075
      if not rpc.call_instance_start(node_current, instance, extra_args):
2076
        _ShutdownInstanceDisks(instance, self.cfg)
2077
        raise errors.OpExecError("Could not start instance for full reboot")
2078

    
2079
    self.cfg.MarkInstanceUp(instance.name)
2080

    
2081

    
2082
class LUShutdownInstance(LogicalUnit):
2083
  """Shutdown an instance.
2084

2085
  """
2086
  HPATH = "instance-stop"
2087
  HTYPE = constants.HTYPE_INSTANCE
2088
  _OP_REQP = ["instance_name"]
2089

    
2090
  def BuildHooksEnv(self):
2091
    """Build hooks env.
2092

2093
    This runs on master, primary and secondary nodes of the instance.
2094

2095
    """
2096
    env = _BuildInstanceHookEnvByObject(self.instance)
2097
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2098
          list(self.instance.secondary_nodes))
2099
    return env, nl, nl
2100

    
2101
  def CheckPrereq(self):
2102
    """Check prerequisites.
2103

2104
    This checks that the instance is in the cluster.
2105

2106
    """
2107
    instance = self.cfg.GetInstanceInfo(
2108
      self.cfg.ExpandInstanceName(self.op.instance_name))
2109
    if instance is None:
2110
      raise errors.OpPrereqError("Instance '%s' not known" %
2111
                                 self.op.instance_name)
2112
    self.instance = instance
2113

    
2114
  def Exec(self, feedback_fn):
2115
    """Shutdown the instance.
2116

2117
    """
2118
    instance = self.instance
2119
    node_current = instance.primary_node
2120
    if not rpc.call_instance_shutdown(node_current, instance):
2121
      logger.Error("could not shutdown instance")
2122

    
2123
    self.cfg.MarkInstanceDown(instance.name)
2124
    _ShutdownInstanceDisks(instance, self.cfg)
2125

    
2126

    
2127
class LUReinstallInstance(LogicalUnit):
2128
  """Reinstall an instance.
2129

2130
  """
2131
  HPATH = "instance-reinstall"
2132
  HTYPE = constants.HTYPE_INSTANCE
2133
  _OP_REQP = ["instance_name"]
2134

    
2135
  def BuildHooksEnv(self):
2136
    """Build hooks env.
2137

2138
    This runs on master, primary and secondary nodes of the instance.
2139

2140
    """
2141
    env = _BuildInstanceHookEnvByObject(self.instance)
2142
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2143
          list(self.instance.secondary_nodes))
2144
    return env, nl, nl
2145

    
2146
  def CheckPrereq(self):
2147
    """Check prerequisites.
2148

2149
    This checks that the instance is in the cluster and is not running.
2150

2151
    """
2152
    instance = self.cfg.GetInstanceInfo(
2153
      self.cfg.ExpandInstanceName(self.op.instance_name))
2154
    if instance is None:
2155
      raise errors.OpPrereqError("Instance '%s' not known" %
2156
                                 self.op.instance_name)
2157
    if instance.disk_template == constants.DT_DISKLESS:
2158
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2159
                                 self.op.instance_name)
2160
    if instance.status != "down":
2161
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2162
                                 self.op.instance_name)
2163
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2164
    if remote_info:
2165
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2166
                                 (self.op.instance_name,
2167
                                  instance.primary_node))
2168

    
2169
    self.op.os_type = getattr(self.op, "os_type", None)
2170
    if self.op.os_type is not None:
2171
      # OS verification
2172
      pnode = self.cfg.GetNodeInfo(
2173
        self.cfg.ExpandNodeName(instance.primary_node))
2174
      if pnode is None:
2175
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2176
                                   self.op.pnode)
2177
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2178
      if not os_obj:
2179
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2180
                                   " primary node"  % self.op.os_type)
2181

    
2182
    self.instance = instance
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Reinstall the instance.
2186

2187
    """
2188
    inst = self.instance
2189

    
2190
    if self.op.os_type is not None:
2191
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2192
      inst.os = self.op.os_type
2193
      self.cfg.AddInstance(inst)
2194

    
2195
    _StartInstanceDisks(self.cfg, inst, None)
2196
    try:
2197
      feedback_fn("Running the instance OS create scripts...")
2198
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2199
        raise errors.OpExecError("Could not install OS for instance %s"
2200
                                 " on node %s" %
2201
                                 (inst.name, inst.primary_node))
2202
    finally:
2203
      _ShutdownInstanceDisks(inst, self.cfg)
2204

    
2205

    
2206
class LURenameInstance(LogicalUnit):
2207
  """Rename an instance.
2208

2209
  """
2210
  HPATH = "instance-rename"
2211
  HTYPE = constants.HTYPE_INSTANCE
2212
  _OP_REQP = ["instance_name", "new_name"]
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = _BuildInstanceHookEnvByObject(self.instance)
2221
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster and is not running.
2230

2231
    """
2232
    instance = self.cfg.GetInstanceInfo(
2233
      self.cfg.ExpandInstanceName(self.op.instance_name))
2234
    if instance is None:
2235
      raise errors.OpPrereqError("Instance '%s' not known" %
2236
                                 self.op.instance_name)
2237
    if instance.status != "down":
2238
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2239
                                 self.op.instance_name)
2240
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2241
    if remote_info:
2242
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2243
                                 (self.op.instance_name,
2244
                                  instance.primary_node))
2245
    self.instance = instance
2246

    
2247
    # new name verification
2248
    name_info = utils.HostInfo(self.op.new_name)
2249

    
2250
    self.op.new_name = new_name = name_info.name
2251
    if not getattr(self.op, "ignore_ip", False):
2252
      command = ["fping", "-q", name_info.ip]
2253
      result = utils.RunCmd(command)
2254
      if not result.failed:
2255
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2256
                                   (name_info.ip, new_name))
2257

    
2258

    
2259
  def Exec(self, feedback_fn):
2260
    """Reinstall the instance.
2261

2262
    """
2263
    inst = self.instance
2264
    old_name = inst.name
2265

    
2266
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2267

    
2268
    # re-read the instance from the configuration after rename
2269
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2270

    
2271
    _StartInstanceDisks(self.cfg, inst, None)
2272
    try:
2273
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2274
                                          "sda", "sdb"):
2275
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2276
               " instance has been renamed in Ganeti)" %
2277
               (inst.name, inst.primary_node))
2278
        logger.Error(msg)
2279
    finally:
2280
      _ShutdownInstanceDisks(inst, self.cfg)
2281

    
2282

    
2283
class LURemoveInstance(LogicalUnit):
2284
  """Remove an instance.
2285

2286
  """
2287
  HPATH = "instance-remove"
2288
  HTYPE = constants.HTYPE_INSTANCE
2289
  _OP_REQP = ["instance_name"]
2290

    
2291
  def BuildHooksEnv(self):
2292
    """Build hooks env.
2293

2294
    This runs on master, primary and secondary nodes of the instance.
2295

2296
    """
2297
    env = _BuildInstanceHookEnvByObject(self.instance)
2298
    nl = [self.sstore.GetMasterNode()]
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    instance = self.cfg.GetInstanceInfo(
2308
      self.cfg.ExpandInstanceName(self.op.instance_name))
2309
    if instance is None:
2310
      raise errors.OpPrereqError("Instance '%s' not known" %
2311
                                 self.op.instance_name)
2312
    self.instance = instance
2313

    
2314
  def Exec(self, feedback_fn):
2315
    """Remove the instance.
2316

2317
    """
2318
    instance = self.instance
2319
    logger.Info("shutting down instance %s on node %s" %
2320
                (instance.name, instance.primary_node))
2321

    
2322
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2323
      if self.op.ignore_failures:
2324
        feedback_fn("Warning: can't shutdown instance")
2325
      else:
2326
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2327
                                 (instance.name, instance.primary_node))
2328

    
2329
    logger.Info("removing block devices for instance %s" % instance.name)
2330

    
2331
    if not _RemoveDisks(instance, self.cfg):
2332
      if self.op.ignore_failures:
2333
        feedback_fn("Warning: can't remove instance's disks")
2334
      else:
2335
        raise errors.OpExecError("Can't remove instance's disks")
2336

    
2337
    logger.Info("removing instance %s out of cluster config" % instance.name)
2338

    
2339
    self.cfg.RemoveInstance(instance.name)
2340

    
2341

    
2342
class LUQueryInstances(NoHooksLU):
2343
  """Logical unit for querying instances.
2344

2345
  """
2346
  _OP_REQP = ["output_fields", "names"]
2347

    
2348
  def CheckPrereq(self):
2349
    """Check prerequisites.
2350

2351
    This checks that the fields required are valid output fields.
2352

2353
    """
2354
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2355
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2356
                               "admin_state", "admin_ram",
2357
                               "disk_template", "ip", "mac", "bridge",
2358
                               "sda_size", "sdb_size"],
2359
                       dynamic=self.dynamic_fields,
2360
                       selected=self.op.output_fields)
2361

    
2362
    self.wanted = _GetWantedInstances(self, self.op.names)
2363

    
2364
  def Exec(self, feedback_fn):
2365
    """Computes the list of nodes and their attributes.
2366

2367
    """
2368
    instance_names = self.wanted
2369
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2370
                     in instance_names]
2371

    
2372
    # begin data gathering
2373

    
2374
    nodes = frozenset([inst.primary_node for inst in instance_list])
2375

    
2376
    bad_nodes = []
2377
    if self.dynamic_fields.intersection(self.op.output_fields):
2378
      live_data = {}
2379
      node_data = rpc.call_all_instances_info(nodes)
2380
      for name in nodes:
2381
        result = node_data[name]
2382
        if result:
2383
          live_data.update(result)
2384
        elif result == False:
2385
          bad_nodes.append(name)
2386
        # else no instance is alive
2387
    else:
2388
      live_data = dict([(name, {}) for name in instance_names])
2389

    
2390
    # end data gathering
2391

    
2392
    output = []
2393
    for instance in instance_list:
2394
      iout = []
2395
      for field in self.op.output_fields:
2396
        if field == "name":
2397
          val = instance.name
2398
        elif field == "os":
2399
          val = instance.os
2400
        elif field == "pnode":
2401
          val = instance.primary_node
2402
        elif field == "snodes":
2403
          val = list(instance.secondary_nodes)
2404
        elif field == "admin_state":
2405
          val = (instance.status != "down")
2406
        elif field == "oper_state":
2407
          if instance.primary_node in bad_nodes:
2408
            val = None
2409
          else:
2410
            val = bool(live_data.get(instance.name))
2411
        elif field == "admin_ram":
2412
          val = instance.memory
2413
        elif field == "oper_ram":
2414
          if instance.primary_node in bad_nodes:
2415
            val = None
2416
          elif instance.name in live_data:
2417
            val = live_data[instance.name].get("memory", "?")
2418
          else:
2419
            val = "-"
2420
        elif field == "disk_template":
2421
          val = instance.disk_template
2422
        elif field == "ip":
2423
          val = instance.nics[0].ip
2424
        elif field == "bridge":
2425
          val = instance.nics[0].bridge
2426
        elif field == "mac":
2427
          val = instance.nics[0].mac
2428
        elif field == "sda_size" or field == "sdb_size":
2429
          disk = instance.FindDisk(field[:3])
2430
          if disk is None:
2431
            val = None
2432
          else:
2433
            val = disk.size
2434
        else:
2435
          raise errors.ParameterError(field)
2436
        iout.append(val)
2437
      output.append(iout)
2438

    
2439
    return output
2440

    
2441

    
2442
class LUFailoverInstance(LogicalUnit):
2443
  """Failover an instance.
2444

2445
  """
2446
  HPATH = "instance-failover"
2447
  HTYPE = constants.HTYPE_INSTANCE
2448
  _OP_REQP = ["instance_name", "ignore_consistency"]
2449

    
2450
  def BuildHooksEnv(self):
2451
    """Build hooks env.
2452

2453
    This runs on master, primary and secondary nodes of the instance.
2454

2455
    """
2456
    env = {
2457
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2458
      }
2459
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2460
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2461
    return env, nl, nl
2462

    
2463
  def CheckPrereq(self):
2464
    """Check prerequisites.
2465

2466
    This checks that the instance is in the cluster.
2467

2468
    """
2469
    instance = self.cfg.GetInstanceInfo(
2470
      self.cfg.ExpandInstanceName(self.op.instance_name))
2471
    if instance is None:
2472
      raise errors.OpPrereqError("Instance '%s' not known" %
2473
                                 self.op.instance_name)
2474

    
2475
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2476
      raise errors.OpPrereqError("Instance's disk layout is not"
2477
                                 " network mirrored, cannot failover.")
2478

    
2479
    secondary_nodes = instance.secondary_nodes
2480
    if not secondary_nodes:
2481
      raise errors.ProgrammerError("no secondary node but using "
2482
                                   "DT_REMOTE_RAID1 template")
2483

    
2484
    # check memory requirements on the secondary node
2485
    target_node = secondary_nodes[0]
2486
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2487
    info = nodeinfo.get(target_node, None)
2488
    if not info:
2489
      raise errors.OpPrereqError("Cannot get current information"
2490
                                 " from node '%s'" % nodeinfo)
2491
    if instance.memory > info['memory_free']:
2492
      raise errors.OpPrereqError("Not enough memory on target node %s."
2493
                                 " %d MB available, %d MB required" %
2494
                                 (target_node, info['memory_free'],
2495
                                  instance.memory))
2496

    
2497
    # check bridge existance
2498
    brlist = [nic.bridge for nic in instance.nics]
2499
    if not rpc.call_bridges_exist(target_node, brlist):
2500
      raise errors.OpPrereqError("One or more target bridges %s does not"
2501
                                 " exist on destination node '%s'" %
2502
                                 (brlist, target_node))
2503

    
2504
    self.instance = instance
2505

    
2506
  def Exec(self, feedback_fn):
2507
    """Failover an instance.
2508

2509
    The failover is done by shutting it down on its present node and
2510
    starting it on the secondary.
2511

2512
    """
2513
    instance = self.instance
2514

    
2515
    source_node = instance.primary_node
2516
    target_node = instance.secondary_nodes[0]
2517

    
2518
    feedback_fn("* checking disk consistency between source and target")
2519
    for dev in instance.disks:
2520
      # for remote_raid1, these are md over drbd
2521
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2522
        if not self.op.ignore_consistency:
2523
          raise errors.OpExecError("Disk %s is degraded on target node,"
2524
                                   " aborting failover." % dev.iv_name)
2525

    
2526
    feedback_fn("* checking target node resource availability")
2527
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2528

    
2529
    if not nodeinfo:
2530
      raise errors.OpExecError("Could not contact target node %s." %
2531
                               target_node)
2532

    
2533
    free_memory = int(nodeinfo[target_node]['memory_free'])
2534
    memory = instance.memory
2535
    if memory > free_memory:
2536
      raise errors.OpExecError("Not enough memory to create instance %s on"
2537
                               " node %s. needed %s MiB, available %s MiB" %
2538
                               (instance.name, target_node, memory,
2539
                                free_memory))
2540

    
2541
    feedback_fn("* shutting down instance on source node")
2542
    logger.Info("Shutting down instance %s on node %s" %
2543
                (instance.name, source_node))
2544

    
2545
    if not rpc.call_instance_shutdown(source_node, instance):
2546
      if self.op.ignore_consistency:
2547
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2548
                     " anyway. Please make sure node %s is down"  %
2549
                     (instance.name, source_node, source_node))
2550
      else:
2551
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2552
                                 (instance.name, source_node))
2553

    
2554
    feedback_fn("* deactivating the instance's disks on source node")
2555
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2556
      raise errors.OpExecError("Can't shut down the instance's disks.")
2557

    
2558
    instance.primary_node = target_node
2559
    # distribute new instance config to the other nodes
2560
    self.cfg.AddInstance(instance)
2561

    
2562
    feedback_fn("* activating the instance's disks on target node")
2563
    logger.Info("Starting instance %s on node %s" %
2564
                (instance.name, target_node))
2565

    
2566
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2567
                                             ignore_secondaries=True)
2568
    if not disks_ok:
2569
      _ShutdownInstanceDisks(instance, self.cfg)
2570
      raise errors.OpExecError("Can't activate the instance's disks")
2571

    
2572
    feedback_fn("* starting the instance on the target node")
2573
    if not rpc.call_instance_start(target_node, instance, None):
2574
      _ShutdownInstanceDisks(instance, self.cfg)
2575
      raise errors.OpExecError("Could not start instance %s on node %s." %
2576
                               (instance.name, target_node))
2577

    
2578

    
2579
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2580
  """Create a tree of block devices on the primary node.
2581

2582
  This always creates all devices.
2583

2584
  """
2585
  if device.children:
2586
    for child in device.children:
2587
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2588
        return False
2589

    
2590
  cfg.SetDiskID(device, node)
2591
  new_id = rpc.call_blockdev_create(node, device, device.size,
2592
                                    instance.name, True, info)
2593
  if not new_id:
2594
    return False
2595
  if device.physical_id is None:
2596
    device.physical_id = new_id
2597
  return True
2598

    
2599

    
2600
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2601
  """Create a tree of block devices on a secondary node.
2602

2603
  If this device type has to be created on secondaries, create it and
2604
  all its children.
2605

2606
  If not, just recurse to children keeping the same 'force' value.
2607

2608
  """
2609
  if device.CreateOnSecondary():
2610
    force = True
2611
  if device.children:
2612
    for child in device.children:
2613
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2614
                                        child, force, info):
2615
        return False
2616

    
2617
  if not force:
2618
    return True
2619
  cfg.SetDiskID(device, node)
2620
  new_id = rpc.call_blockdev_create(node, device, device.size,
2621
                                    instance.name, False, info)
2622
  if not new_id:
2623
    return False
2624
  if device.physical_id is None:
2625
    device.physical_id = new_id
2626
  return True
2627

    
2628

    
2629
def _GenerateUniqueNames(cfg, exts):
2630
  """Generate a suitable LV name.
2631

2632
  This will generate a logical volume name for the given instance.
2633

2634
  """
2635
  results = []
2636
  for val in exts:
2637
    new_id = cfg.GenerateUniqueID()
2638
    results.append("%s%s" % (new_id, val))
2639
  return results
2640

    
2641

    
2642
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2643
  """Generate a drbd device complete with its children.
2644

2645
  """
2646
  port = cfg.AllocatePort()
2647
  vgname = cfg.GetVGName()
2648
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2649
                          logical_id=(vgname, names[0]))
2650
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2651
                          logical_id=(vgname, names[1]))
2652
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2653
                          logical_id = (primary, secondary, port),
2654
                          children = [dev_data, dev_meta])
2655
  return drbd_dev
2656

    
2657

    
2658
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2659
  """Generate a drbd8 device complete with its children.
2660

2661
  """
2662
  port = cfg.AllocatePort()
2663
  vgname = cfg.GetVGName()
2664
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2665
                          logical_id=(vgname, names[0]))
2666
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2667
                          logical_id=(vgname, names[1]))
2668
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2669
                          logical_id = (primary, secondary, port),
2670
                          children = [dev_data, dev_meta],
2671
                          iv_name=iv_name)
2672
  return drbd_dev
2673

    
2674
def _GenerateDiskTemplate(cfg, template_name,
2675
                          instance_name, primary_node,
2676
                          secondary_nodes, disk_sz, swap_sz):
2677
  """Generate the entire disk layout for a given template type.
2678

2679
  """
2680
  #TODO: compute space requirements
2681

    
2682
  vgname = cfg.GetVGName()
2683
  if template_name == "diskless":
2684
    disks = []
2685
  elif template_name == "plain":
2686
    if len(secondary_nodes) != 0:
2687
      raise errors.ProgrammerError("Wrong template configuration")
2688

    
2689
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2690
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2691
                           logical_id=(vgname, names[0]),
2692
                           iv_name = "sda")
2693
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2694
                           logical_id=(vgname, names[1]),
2695
                           iv_name = "sdb")
2696
    disks = [sda_dev, sdb_dev]
2697
  elif template_name == "local_raid1":
2698
    if len(secondary_nodes) != 0:
2699
      raise errors.ProgrammerError("Wrong template configuration")
2700

    
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2703
                                       ".sdb_m1", ".sdb_m2"])
2704
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2705
                              logical_id=(vgname, names[0]))
2706
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2707
                              logical_id=(vgname, names[1]))
2708
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2709
                              size=disk_sz,
2710
                              children = [sda_dev_m1, sda_dev_m2])
2711
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2712
                              logical_id=(vgname, names[2]))
2713
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714
                              logical_id=(vgname, names[3]))
2715
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2716
                              size=swap_sz,
2717
                              children = [sdb_dev_m1, sdb_dev_m2])
2718
    disks = [md_sda_dev, md_sdb_dev]
2719
  elif template_name == constants.DT_REMOTE_RAID1:
2720
    if len(secondary_nodes) != 1:
2721
      raise errors.ProgrammerError("Wrong template configuration")
2722
    remote_node = secondary_nodes[0]
2723
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724
                                       ".sdb_data", ".sdb_meta"])
2725
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2726
                                         disk_sz, names[0:2])
2727
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2728
                              children = [drbd_sda_dev], size=disk_sz)
2729
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2730
                                         swap_sz, names[2:4])
2731
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2732
                              children = [drbd_sdb_dev], size=swap_sz)
2733
    disks = [md_sda_dev, md_sdb_dev]
2734
  elif template_name == constants.DT_DRBD8:
2735
    if len(secondary_nodes) != 1:
2736
      raise errors.ProgrammerError("Wrong template configuration")
2737
    remote_node = secondary_nodes[0]
2738
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2739
                                       ".sdb_data", ".sdb_meta"])
2740
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741
                                         disk_sz, names[0:2], "sda")
2742
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2743
                                         swap_sz, names[2:4], "sdb")
2744
    disks = [drbd_sda_dev, drbd_sdb_dev]
2745
  else:
2746
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2747
  return disks
2748

    
2749

    
2750
def _GetInstanceInfoText(instance):
2751
  """Compute that text that should be added to the disk's metadata.
2752

2753
  """
2754
  return "originstname+%s" % instance.name
2755

    
2756

    
2757
def _CreateDisks(cfg, instance):
2758
  """Create all disks for an instance.
2759

2760
  This abstracts away some work from AddInstance.
2761

2762
  Args:
2763
    instance: the instance object
2764

2765
  Returns:
2766
    True or False showing the success of the creation process
2767

2768
  """
2769
  info = _GetInstanceInfoText(instance)
2770

    
2771
  for device in instance.disks:
2772
    logger.Info("creating volume %s for instance %s" %
2773
              (device.iv_name, instance.name))
2774
    #HARDCODE
2775
    for secondary_node in instance.secondary_nodes:
2776
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777
                                        device, False, info):
2778
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779
                     (device.iv_name, device, secondary_node))
2780
        return False
2781
    #HARDCODE
2782
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783
                                    instance, device, info):
2784
      logger.Error("failed to create volume %s on primary!" %
2785
                   device.iv_name)
2786
      return False
2787
  return True
2788

    
2789

    
2790
def _RemoveDisks(instance, cfg):
2791
  """Remove all disks for an instance.
2792

2793
  This abstracts away some work from `AddInstance()` and
2794
  `RemoveInstance()`. Note that in case some of the devices couldn't
2795
  be removed, the removal will continue with the other ones (compare
2796
  with `_CreateDisks()`).
2797

2798
  Args:
2799
    instance: the instance object
2800

2801
  Returns:
2802
    True or False showing the success of the removal proces
2803

2804
  """
2805
  logger.Info("removing block devices for instance %s" % instance.name)
2806

    
2807
  result = True
2808
  for device in instance.disks:
2809
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2810
      cfg.SetDiskID(disk, node)
2811
      if not rpc.call_blockdev_remove(node, disk):
2812
        logger.Error("could not remove block device %s on node %s,"
2813
                     " continuing anyway" %
2814
                     (device.iv_name, node))
2815
        result = False
2816
  return result
2817

    
2818

    
2819
class LUCreateInstance(LogicalUnit):
2820
  """Create an instance.
2821

2822
  """
2823
  HPATH = "instance-add"
2824
  HTYPE = constants.HTYPE_INSTANCE
2825
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2826
              "disk_template", "swap_size", "mode", "start", "vcpus",
2827
              "wait_for_sync", "ip_check"]
2828

    
2829
  def BuildHooksEnv(self):
2830
    """Build hooks env.
2831

2832
    This runs on master, primary and secondary nodes of the instance.
2833

2834
    """
2835
    env = {
2836
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2837
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2838
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2839
      "INSTANCE_ADD_MODE": self.op.mode,
2840
      }
2841
    if self.op.mode == constants.INSTANCE_IMPORT:
2842
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2843
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2844
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2845

    
2846
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2847
      primary_node=self.op.pnode,
2848
      secondary_nodes=self.secondaries,
2849
      status=self.instance_status,
2850
      os_type=self.op.os_type,
2851
      memory=self.op.mem_size,
2852
      vcpus=self.op.vcpus,
2853
      nics=[(self.inst_ip, self.op.bridge)],
2854
    ))
2855

    
2856
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2857
          self.secondaries)
2858
    return env, nl, nl
2859

    
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    """
2865
    if self.op.mode not in (constants.INSTANCE_CREATE,
2866
                            constants.INSTANCE_IMPORT):
2867
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2868
                                 self.op.mode)
2869

    
2870
    if self.op.mode == constants.INSTANCE_IMPORT:
2871
      src_node = getattr(self.op, "src_node", None)
2872
      src_path = getattr(self.op, "src_path", None)
2873
      if src_node is None or src_path is None:
2874
        raise errors.OpPrereqError("Importing an instance requires source"
2875
                                   " node and path options")
2876
      src_node_full = self.cfg.ExpandNodeName(src_node)
2877
      if src_node_full is None:
2878
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2879
      self.op.src_node = src_node = src_node_full
2880

    
2881
      if not os.path.isabs(src_path):
2882
        raise errors.OpPrereqError("The source path must be absolute")
2883

    
2884
      export_info = rpc.call_export_info(src_node, src_path)
2885

    
2886
      if not export_info:
2887
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2888

    
2889
      if not export_info.has_section(constants.INISECT_EXP):
2890
        raise errors.ProgrammerError("Corrupted export config")
2891

    
2892
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2893
      if (int(ei_version) != constants.EXPORT_VERSION):
2894
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2895
                                   (ei_version, constants.EXPORT_VERSION))
2896

    
2897
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2898
        raise errors.OpPrereqError("Can't import instance with more than"
2899
                                   " one data disk")
2900

    
2901
      # FIXME: are the old os-es, disk sizes, etc. useful?
2902
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2903
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2904
                                                         'disk0_dump'))
2905
      self.src_image = diskimage
2906
    else: # INSTANCE_CREATE
2907
      if getattr(self.op, "os_type", None) is None:
2908
        raise errors.OpPrereqError("No guest OS specified")
2909

    
2910
    # check primary node
2911
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2912
    if pnode is None:
2913
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2914
                                 self.op.pnode)
2915
    self.op.pnode = pnode.name
2916
    self.pnode = pnode
2917
    self.secondaries = []
2918
    # disk template and mirror node verification
2919
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2920
      raise errors.OpPrereqError("Invalid disk template name")
2921

    
2922
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2923
      if getattr(self.op, "snode", None) is None:
2924
        raise errors.OpPrereqError("The networked disk templates need"
2925
                                   " a mirror node")
2926

    
2927
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2928
      if snode_name is None:
2929
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2930
                                   self.op.snode)
2931
      elif snode_name == pnode.name:
2932
        raise errors.OpPrereqError("The secondary node cannot be"
2933
                                   " the primary node.")
2934
      self.secondaries.append(snode_name)
2935

    
2936
    # Check lv size requirements
2937
    nodenames = [pnode.name] + self.secondaries
2938
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2939

    
2940
    # Required free disk space as a function of disk and swap space
2941
    req_size_dict = {
2942
      constants.DT_DISKLESS: 0,
2943
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2944
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2945
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2946
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2947
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2948
    }
2949

    
2950
    if self.op.disk_template not in req_size_dict:
2951
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2952
                                   " is unknown" %  self.op.disk_template)
2953

    
2954
    req_size = req_size_dict[self.op.disk_template]
2955

    
2956
    for node in nodenames:
2957
      info = nodeinfo.get(node, None)
2958
      if not info:
2959
        raise errors.OpPrereqError("Cannot get current information"
2960
                                   " from node '%s'" % nodeinfo)
2961
      if req_size > info['vg_free']:
2962
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2963
                                   " %d MB available, %d MB required" %
2964
                                   (node, info['vg_free'], req_size))
2965

    
2966
    # os verification
2967
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2968
    if not os_obj:
2969
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2970
                                 " primary node"  % self.op.os_type)
2971

    
2972
    # instance verification
2973
    hostname1 = utils.HostInfo(self.op.instance_name)
2974

    
2975
    self.op.instance_name = instance_name = hostname1.name
2976
    instance_list = self.cfg.GetInstanceList()
2977
    if instance_name in instance_list:
2978
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2979
                                 instance_name)
2980

    
2981
    ip = getattr(self.op, "ip", None)
2982
    if ip is None or ip.lower() == "none":
2983
      inst_ip = None
2984
    elif ip.lower() == "auto":
2985
      inst_ip = hostname1.ip
2986
    else:
2987
      if not utils.IsValidIP(ip):
2988
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2989
                                   " like a valid IP" % ip)
2990
      inst_ip = ip
2991
    self.inst_ip = inst_ip
2992

    
2993
    if self.op.start and not self.op.ip_check:
2994
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2995
                                 " adding an instance in start mode")
2996

    
2997
    if self.op.ip_check:
2998
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2999
                       constants.DEFAULT_NODED_PORT):
3000
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3001
                                   (hostname1.ip, instance_name))
3002

    
3003
    # bridge verification
3004
    bridge = getattr(self.op, "bridge", None)
3005
    if bridge is None:
3006
      self.op.bridge = self.cfg.GetDefBridge()
3007
    else:
3008
      self.op.bridge = bridge
3009

    
3010
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3011
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3012
                                 " destination node '%s'" %
3013
                                 (self.op.bridge, pnode.name))
3014

    
3015
    if self.op.start:
3016
      self.instance_status = 'up'
3017
    else:
3018
      self.instance_status = 'down'
3019

    
3020
  def Exec(self, feedback_fn):
3021
    """Create and add the instance to the cluster.
3022

3023
    """
3024
    instance = self.op.instance_name
3025
    pnode_name = self.pnode.name
3026

    
3027
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3028
    if self.inst_ip is not None:
3029
      nic.ip = self.inst_ip
3030

    
3031
    disks = _GenerateDiskTemplate(self.cfg,
3032
                                  self.op.disk_template,
3033
                                  instance, pnode_name,
3034
                                  self.secondaries, self.op.disk_size,
3035
                                  self.op.swap_size)
3036

    
3037
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3038
                            primary_node=pnode_name,
3039
                            memory=self.op.mem_size,
3040
                            vcpus=self.op.vcpus,
3041
                            nics=[nic], disks=disks,
3042
                            disk_template=self.op.disk_template,
3043
                            status=self.instance_status,
3044
                            )
3045

    
3046
    feedback_fn("* creating instance disks...")
3047
    if not _CreateDisks(self.cfg, iobj):
3048
      _RemoveDisks(iobj, self.cfg)
3049
      raise errors.OpExecError("Device creation failed, reverting...")
3050

    
3051
    feedback_fn("adding instance %s to cluster config" % instance)
3052

    
3053
    self.cfg.AddInstance(iobj)
3054

    
3055
    if self.op.wait_for_sync:
3056
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3057
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3058
      # make sure the disks are not degraded (still sync-ing is ok)
3059
      time.sleep(15)
3060
      feedback_fn("* checking mirrors status")
3061
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3062
    else:
3063
      disk_abort = False
3064

    
3065
    if disk_abort:
3066
      _RemoveDisks(iobj, self.cfg)
3067
      self.cfg.RemoveInstance(iobj.name)
3068
      raise errors.OpExecError("There are some degraded disks for"
3069
                               " this instance")
3070

    
3071
    feedback_fn("creating os for instance %s on node %s" %
3072
                (instance, pnode_name))
3073

    
3074
    if iobj.disk_template != constants.DT_DISKLESS:
3075
      if self.op.mode == constants.INSTANCE_CREATE:
3076
        feedback_fn("* running the instance OS create scripts...")
3077
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3078
          raise errors.OpExecError("could not add os for instance %s"
3079
                                   " on node %s" %
3080
                                   (instance, pnode_name))
3081

    
3082
      elif self.op.mode == constants.INSTANCE_IMPORT:
3083
        feedback_fn("* running the instance OS import scripts...")
3084
        src_node = self.op.src_node
3085
        src_image = self.src_image
3086
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3087
                                                src_node, src_image):
3088
          raise errors.OpExecError("Could not import os for instance"
3089
                                   " %s on node %s" %
3090
                                   (instance, pnode_name))
3091
      else:
3092
        # also checked in the prereq part
3093
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3094
                                     % self.op.mode)
3095

    
3096
    if self.op.start:
3097
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3098
      feedback_fn("* starting instance...")
3099
      if not rpc.call_instance_start(pnode_name, iobj, None):
3100
        raise errors.OpExecError("Could not start instance")
3101

    
3102

    
3103
class LUConnectConsole(NoHooksLU):
3104
  """Connect to an instance's console.
3105

3106
  This is somewhat special in that it returns the command line that
3107
  you need to run on the master node in order to connect to the
3108
  console.
3109

3110
  """
3111
  _OP_REQP = ["instance_name"]
3112

    
3113
  def CheckPrereq(self):
3114
    """Check prerequisites.
3115

3116
    This checks that the instance is in the cluster.
3117

3118
    """
3119
    instance = self.cfg.GetInstanceInfo(
3120
      self.cfg.ExpandInstanceName(self.op.instance_name))
3121
    if instance is None:
3122
      raise errors.OpPrereqError("Instance '%s' not known" %
3123
                                 self.op.instance_name)
3124
    self.instance = instance
3125

    
3126
  def Exec(self, feedback_fn):
3127
    """Connect to the console of an instance
3128

3129
    """
3130
    instance = self.instance
3131
    node = instance.primary_node
3132

    
3133
    node_insts = rpc.call_instance_list([node])[node]
3134
    if node_insts is False:
3135
      raise errors.OpExecError("Can't connect to node %s." % node)
3136

    
3137
    if instance.name not in node_insts:
3138
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3139

    
3140
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3141

    
3142
    hyper = hypervisor.GetHypervisor()
3143
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3144
    # build ssh cmdline
3145
    argv = ["ssh", "-q", "-t"]
3146
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3147
    argv.extend(ssh.BATCH_MODE_OPTS)
3148
    argv.append(node)
3149
    argv.append(console_cmd)
3150
    return "ssh", argv
3151

    
3152

    
3153
class LUAddMDDRBDComponent(LogicalUnit):
3154
  """Adda new mirror member to an instance's disk.
3155

3156
  """
3157
  HPATH = "mirror-add"
3158
  HTYPE = constants.HTYPE_INSTANCE
3159
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3160

    
3161
  def BuildHooksEnv(self):
3162
    """Build hooks env.
3163

3164
    This runs on the master, the primary and all the secondaries.
3165

3166
    """
3167
    env = {
3168
      "NEW_SECONDARY": self.op.remote_node,
3169
      "DISK_NAME": self.op.disk_name,
3170
      }
3171
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3172
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3173
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3174
    return env, nl, nl
3175

    
3176
  def CheckPrereq(self):
3177
    """Check prerequisites.
3178

3179
    This checks that the instance is in the cluster.
3180

3181
    """
3182
    instance = self.cfg.GetInstanceInfo(
3183
      self.cfg.ExpandInstanceName(self.op.instance_name))
3184
    if instance is None:
3185
      raise errors.OpPrereqError("Instance '%s' not known" %
3186
                                 self.op.instance_name)
3187
    self.instance = instance
3188

    
3189
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3190
    if remote_node is None:
3191
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3192
    self.remote_node = remote_node
3193

    
3194
    if remote_node == instance.primary_node:
3195
      raise errors.OpPrereqError("The specified node is the primary node of"
3196
                                 " the instance.")
3197

    
3198
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3199
      raise errors.OpPrereqError("Instance's disk layout is not"
3200
                                 " remote_raid1.")
3201
    for disk in instance.disks:
3202
      if disk.iv_name == self.op.disk_name:
3203
        break
3204
    else:
3205
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3206
                                 " instance." % self.op.disk_name)
3207
    if len(disk.children) > 1:
3208
      raise errors.OpPrereqError("The device already has two slave devices."
3209
                                 " This would create a 3-disk raid1 which we"
3210
                                 " don't allow.")
3211
    self.disk = disk
3212

    
3213
  def Exec(self, feedback_fn):
3214
    """Add the mirror component
3215

3216
    """
3217
    disk = self.disk
3218
    instance = self.instance
3219

    
3220
    remote_node = self.remote_node
3221
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3222
    names = _GenerateUniqueNames(self.cfg, lv_names)
3223
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3224
                                     remote_node, disk.size, names)
3225

    
3226
    logger.Info("adding new mirror component on secondary")
3227
    #HARDCODE
3228
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3229
                                      new_drbd, False,
3230
                                      _GetInstanceInfoText(instance)):
3231
      raise errors.OpExecError("Failed to create new component on secondary"
3232
                               " node %s" % remote_node)
3233

    
3234
    logger.Info("adding new mirror component on primary")
3235
    #HARDCODE
3236
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3237
                                    instance, new_drbd,
3238
                                    _GetInstanceInfoText(instance)):
3239
      # remove secondary dev
3240
      self.cfg.SetDiskID(new_drbd, remote_node)
3241
      rpc.call_blockdev_remove(remote_node, new_drbd)
3242
      raise errors.OpExecError("Failed to create volume on primary")
3243

    
3244
    # the device exists now
3245
    # call the primary node to add the mirror to md
3246
    logger.Info("adding new mirror component to md")
3247
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3248
                                         disk, [new_drbd]):
3249
      logger.Error("Can't add mirror compoment to md!")
3250
      self.cfg.SetDiskID(new_drbd, remote_node)
3251
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3252
        logger.Error("Can't rollback on secondary")
3253
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3254
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3255
        logger.Error("Can't rollback on primary")
3256
      raise errors.OpExecError("Can't add mirror component to md array")
3257

    
3258
    disk.children.append(new_drbd)
3259

    
3260
    self.cfg.AddInstance(instance)
3261

    
3262
    _WaitForSync(self.cfg, instance, self.proc)
3263

    
3264
    return 0
3265

    
3266

    
3267
class LURemoveMDDRBDComponent(LogicalUnit):
3268
  """Remove a component from a remote_raid1 disk.
3269

3270
  """
3271
  HPATH = "mirror-remove"
3272
  HTYPE = constants.HTYPE_INSTANCE
3273
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3274

    
3275
  def BuildHooksEnv(self):
3276
    """Build hooks env.
3277

3278
    This runs on the master, the primary and all the secondaries.
3279

3280
    """
3281
    env = {
3282
      "DISK_NAME": self.op.disk_name,
3283
      "DISK_ID": self.op.disk_id,
3284
      "OLD_SECONDARY": self.old_secondary,
3285
      }
3286
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3287
    nl = [self.sstore.GetMasterNode(),
3288
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3289
    return env, nl, nl
3290

    
3291
  def CheckPrereq(self):
3292
    """Check prerequisites.
3293

3294
    This checks that the instance is in the cluster.
3295

3296
    """
3297
    instance = self.cfg.GetInstanceInfo(
3298
      self.cfg.ExpandInstanceName(self.op.instance_name))
3299
    if instance is None:
3300
      raise errors.OpPrereqError("Instance '%s' not known" %
3301
                                 self.op.instance_name)
3302
    self.instance = instance
3303

    
3304
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3305
      raise errors.OpPrereqError("Instance's disk layout is not"
3306
                                 " remote_raid1.")
3307
    for disk in instance.disks:
3308
      if disk.iv_name == self.op.disk_name:
3309
        break
3310
    else:
3311
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3312
                                 " instance." % self.op.disk_name)
3313
    for child in disk.children:
3314
      if (child.dev_type == constants.LD_DRBD7 and
3315
          child.logical_id[2] == self.op.disk_id):
3316
        break
3317
    else:
3318
      raise errors.OpPrereqError("Can't find the device with this port.")
3319

    
3320
    if len(disk.children) < 2:
3321
      raise errors.OpPrereqError("Cannot remove the last component from"
3322
                                 " a mirror.")
3323
    self.disk = disk
3324
    self.child = child
3325
    if self.child.logical_id[0] == instance.primary_node:
3326
      oid = 1
3327
    else:
3328
      oid = 0
3329
    self.old_secondary = self.child.logical_id[oid]
3330

    
3331
  def Exec(self, feedback_fn):
3332
    """Remove the mirror component
3333

3334
    """
3335
    instance = self.instance
3336
    disk = self.disk
3337
    child = self.child
3338
    logger.Info("remove mirror component")
3339
    self.cfg.SetDiskID(disk, instance.primary_node)
3340
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3341
                                            disk, [child]):
3342
      raise errors.OpExecError("Can't remove child from mirror.")
3343

    
3344
    for node in child.logical_id[:2]:
3345
      self.cfg.SetDiskID(child, node)
3346
      if not rpc.call_blockdev_remove(node, child):
3347
        logger.Error("Warning: failed to remove device from node %s,"
3348
                     " continuing operation." % node)
3349

    
3350
    disk.children.remove(child)
3351
    self.cfg.AddInstance(instance)
3352

    
3353

    
3354
class LUReplaceDisks(LogicalUnit):
3355
  """Replace the disks of an instance.
3356

3357
  """
3358
  HPATH = "mirrors-replace"
3359
  HTYPE = constants.HTYPE_INSTANCE
3360
  _OP_REQP = ["instance_name", "mode", "disks"]
3361

    
3362
  def BuildHooksEnv(self):
3363
    """Build hooks env.
3364

3365
    This runs on the master, the primary and all the secondaries.
3366

3367
    """
3368
    env = {
3369
      "MODE": self.op.mode,
3370
      "NEW_SECONDARY": self.op.remote_node,
3371
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3372
      }
3373
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3374
    nl = [
3375
      self.sstore.GetMasterNode(),
3376
      self.instance.primary_node,
3377
      ]
3378
    if self.op.remote_node is not None:
3379
      nl.append(self.op.remote_node)
3380
    return env, nl, nl
3381

    
3382
  def CheckPrereq(self):
3383
    """Check prerequisites.
3384

3385
    This checks that the instance is in the cluster.
3386

3387
    """
3388
    instance = self.cfg.GetInstanceInfo(
3389
      self.cfg.ExpandInstanceName(self.op.instance_name))
3390
    if instance is None:
3391
      raise errors.OpPrereqError("Instance '%s' not known" %
3392
                                 self.op.instance_name)
3393
    self.instance = instance
3394
    self.op.instance_name = instance.name
3395

    
3396
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3397
      raise errors.OpPrereqError("Instance's disk layout is not"
3398
                                 " network mirrored.")
3399

    
3400
    if len(instance.secondary_nodes) != 1:
3401
      raise errors.OpPrereqError("The instance has a strange layout,"
3402
                                 " expected one secondary but found %d" %
3403
                                 len(instance.secondary_nodes))
3404

    
3405
    self.sec_node = instance.secondary_nodes[0]
3406

    
3407
    remote_node = getattr(self.op, "remote_node", None)
3408
    if remote_node is not None:
3409
      remote_node = self.cfg.ExpandNodeName(remote_node)
3410
      if remote_node is None:
3411
        raise errors.OpPrereqError("Node '%s' not known" %
3412
                                   self.op.remote_node)
3413
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3414
    else:
3415
      self.remote_node_info = None
3416
    if remote_node == instance.primary_node:
3417
      raise errors.OpPrereqError("The specified node is the primary node of"
3418
                                 " the instance.")
3419
    elif remote_node == self.sec_node:
3420
      if self.op.mode == constants.REPLACE_DISK_SEC:
3421
        # this is for DRBD8, where we can't execute the same mode of
3422
        # replacement as for drbd7 (no different port allocated)
3423
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3424
                                   " replacement")
3425
      # the user gave the current secondary, switch to
3426
      # 'no-replace-secondary' mode for drbd7
3427
      remote_node = None
3428
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3429
        self.op.mode != constants.REPLACE_DISK_ALL):
3430
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3431
                                 " disks replacement, not individual ones")
3432
    if instance.disk_template == constants.DT_DRBD8:
3433
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3434
          remote_node is not None):
3435
        # switch to replace secondary mode
3436
        self.op.mode = constants.REPLACE_DISK_SEC
3437

    
3438
      if self.op.mode == constants.REPLACE_DISK_ALL:
3439
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3440
                                   " secondary disk replacement, not"
3441
                                   " both at once")
3442
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3443
        if remote_node is not None:
3444
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3445
                                     " the secondary while doing a primary"
3446
                                     " node disk replacement")
3447
        self.tgt_node = instance.primary_node
3448
        self.oth_node = instance.secondary_nodes[0]
3449
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3450
        self.new_node = remote_node # this can be None, in which case
3451
                                    # we don't change the secondary
3452
        self.tgt_node = instance.secondary_nodes[0]
3453
        self.oth_node = instance.primary_node
3454
      else:
3455
        raise errors.ProgrammerError("Unhandled disk replace mode")
3456

    
3457
    for name in self.op.disks:
3458
      if instance.FindDisk(name) is None:
3459
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3460
                                   (name, instance.name))
3461
    self.op.remote_node = remote_node
3462

    
3463
  def _ExecRR1(self, feedback_fn):
3464
    """Replace the disks of an instance.
3465

3466
    """
3467
    instance = self.instance
3468
    iv_names = {}
3469
    # start of work
3470
    if self.op.remote_node is None:
3471
      remote_node = self.sec_node
3472
    else:
3473
      remote_node = self.op.remote_node
3474
    cfg = self.cfg
3475
    for dev in instance.disks:
3476
      size = dev.size
3477
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3478
      names = _GenerateUniqueNames(cfg, lv_names)
3479
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3480
                                       remote_node, size, names)
3481
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3482
      logger.Info("adding new mirror component on secondary for %s" %
3483
                  dev.iv_name)
3484
      #HARDCODE
3485
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3486
                                        new_drbd, False,
3487
                                        _GetInstanceInfoText(instance)):
3488
        raise errors.OpExecError("Failed to create new component on secondary"
3489
                                 " node %s. Full abort, cleanup manually!" %
3490
                                 remote_node)
3491

    
3492
      logger.Info("adding new mirror component on primary")
3493
      #HARDCODE
3494
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3495
                                      instance, new_drbd,
3496
                                      _GetInstanceInfoText(instance)):
3497
        # remove secondary dev
3498
        cfg.SetDiskID(new_drbd, remote_node)
3499
        rpc.call_blockdev_remove(remote_node, new_drbd)
3500
        raise errors.OpExecError("Failed to create volume on primary!"
3501
                                 " Full abort, cleanup manually!!")
3502

    
3503
      # the device exists now
3504
      # call the primary node to add the mirror to md
3505
      logger.Info("adding new mirror component to md")
3506
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3507
                                           [new_drbd]):
3508
        logger.Error("Can't add mirror compoment to md!")
3509
        cfg.SetDiskID(new_drbd, remote_node)
3510
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3511
          logger.Error("Can't rollback on secondary")
3512
        cfg.SetDiskID(new_drbd, instance.primary_node)
3513
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3514
          logger.Error("Can't rollback on primary")
3515
        raise errors.OpExecError("Full abort, cleanup manually!!")
3516

    
3517
      dev.children.append(new_drbd)
3518
      cfg.AddInstance(instance)
3519

    
3520
    # this can fail as the old devices are degraded and _WaitForSync
3521
    # does a combined result over all disks, so we don't check its
3522
    # return value
3523
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3524

    
3525
    # so check manually all the devices
3526
    for name in iv_names:
3527
      dev, child, new_drbd = iv_names[name]
3528
      cfg.SetDiskID(dev, instance.primary_node)
3529
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3530
      if is_degr:
3531
        raise errors.OpExecError("MD device %s is degraded!" % name)
3532
      cfg.SetDiskID(new_drbd, instance.primary_node)
3533
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3534
      if is_degr:
3535
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3536

    
3537
    for name in iv_names:
3538
      dev, child, new_drbd = iv_names[name]
3539
      logger.Info("remove mirror %s component" % name)
3540
      cfg.SetDiskID(dev, instance.primary_node)
3541
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3542
                                              dev, [child]):
3543
        logger.Error("Can't remove child from mirror, aborting"
3544
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3545
        continue
3546

    
3547
      for node in child.logical_id[:2]:
3548
        logger.Info("remove child device on %s" % node)
3549
        cfg.SetDiskID(child, node)
3550
        if not rpc.call_blockdev_remove(node, child):
3551
          logger.Error("Warning: failed to remove device from node %s,"
3552
                       " continuing operation." % node)
3553

    
3554
      dev.children.remove(child)
3555

    
3556
      cfg.AddInstance(instance)
3557

    
3558
  def _ExecD8DiskOnly(self, feedback_fn):
3559
    """Replace a disk on the primary or secondary for dbrd8.
3560

3561
    The algorithm for replace is quite complicated:
3562
      - for each disk to be replaced:
3563
        - create new LVs on the target node with unique names
3564
        - detach old LVs from the drbd device
3565
        - rename old LVs to name_replaced.<time_t>
3566
        - rename new LVs to old LVs
3567
        - attach the new LVs (with the old names now) to the drbd device
3568
      - wait for sync across all devices
3569
      - for each modified disk:
3570
        - remove old LVs (which have the name name_replaces.<time_t>)
3571

3572
    Failures are not very well handled.
3573

3574
    """
3575
    steps_total = 6
3576
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3577
    instance = self.instance
3578
    iv_names = {}
3579
    vgname = self.cfg.GetVGName()
3580
    # start of work
3581
    cfg = self.cfg
3582
    tgt_node = self.tgt_node
3583
    oth_node = self.oth_node
3584

    
3585
    # Step: check device activation
3586
    self.proc.LogStep(1, steps_total, "check device existence")
3587
    info("checking volume groups")
3588
    my_vg = cfg.GetVGName()
3589
    results = rpc.call_vg_list([oth_node, tgt_node])
3590
    if not results:
3591
      raise errors.OpExecError("Can't list volume groups on the nodes")
3592
    for node in oth_node, tgt_node:
3593
      res = results.get(node, False)
3594
      if not res or my_vg not in res:
3595
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3596
                                 (my_vg, node))
3597
    for dev in instance.disks:
3598
      if not dev.iv_name in self.op.disks:
3599
        continue
3600
      for node in tgt_node, oth_node:
3601
        info("checking %s on %s" % (dev.iv_name, node))
3602
        cfg.SetDiskID(dev, node)
3603
        if not rpc.call_blockdev_find(node, dev):
3604
          raise errors.OpExecError("Can't find device %s on node %s" %
3605
                                   (dev.iv_name, node))
3606

    
3607
    # Step: check other node consistency
3608
    self.proc.LogStep(2, steps_total, "check peer consistency")
3609
    for dev in instance.disks:
3610
      if not dev.iv_name in self.op.disks:
3611
        continue
3612
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3613
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3614
                                   oth_node==instance.primary_node):
3615
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3616
                                 " to replace disks on this node (%s)" %
3617
                                 (oth_node, tgt_node))
3618

    
3619
    # Step: create new storage
3620
    self.proc.LogStep(3, steps_total, "allocate new storage")
3621
    for dev in instance.disks:
3622
      if not dev.iv_name in self.op.disks:
3623
        continue
3624
      size = dev.size
3625
      cfg.SetDiskID(dev, tgt_node)
3626
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3627
      names = _GenerateUniqueNames(cfg, lv_names)
3628
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3629
                             logical_id=(vgname, names[0]))
3630
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3631
                             logical_id=(vgname, names[1]))
3632
      new_lvs = [lv_data, lv_meta]
3633
      old_lvs = dev.children
3634
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3635
      info("creating new local storage on %s for %s" %
3636
           (tgt_node, dev.iv_name))
3637
      # since we *always* want to create this LV, we use the
3638
      # _Create...OnPrimary (which forces the creation), even if we
3639
      # are talking about the secondary node
3640
      for new_lv in new_lvs:
3641
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3642
                                        _GetInstanceInfoText(instance)):
3643
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3644
                                   " node '%s'" %
3645
                                   (new_lv.logical_id[1], tgt_node))
3646

    
3647
    # Step: for each lv, detach+rename*2+attach
3648
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3649
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3650
      info("detaching %s drbd from local storage" % dev.iv_name)
3651
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3652
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3653
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3654
      #dev.children = []
3655
      #cfg.Update(instance)
3656

    
3657
      # ok, we created the new LVs, so now we know we have the needed
3658
      # storage; as such, we proceed on the target node to rename
3659
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3660
      # using the assumption than logical_id == physical_id (which in
3661
      # turn is the unique_id on that node)
3662

    
3663
      # FIXME(iustin): use a better name for the replaced LVs
3664
      temp_suffix = int(time.time())
3665
      ren_fn = lambda d, suff: (d.physical_id[0],
3666
                                d.physical_id[1] + "_replaced-%s" % suff)
3667
      # build the rename list based on what LVs exist on the node
3668
      rlist = []
3669
      for to_ren in old_lvs:
3670
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3671
        if find_res is not None: # device exists
3672
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3673

    
3674
      info("renaming the old LVs on the target node")
3675
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3676
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3677
      # now we rename the new LVs to the old LVs
3678
      info("renaming the new LVs on the target node")
3679
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3680
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3681
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3682

    
3683
      for old, new in zip(old_lvs, new_lvs):
3684
        new.logical_id = old.logical_id
3685
        cfg.SetDiskID(new, tgt_node)
3686

    
3687
      for disk in old_lvs:
3688
        disk.logical_id = ren_fn(disk, temp_suffix)
3689
        cfg.SetDiskID(disk, tgt_node)
3690

    
3691
      # now that the new lvs have the old name, we can add them to the device
3692
      info("adding new mirror component on %s" % tgt_node)
3693
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3694
        for new_lv in new_lvs:
3695
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3696
            warning("Can't rollback device %s", "manually cleanup unused"
3697
                    " logical volumes")
3698
        raise errors.OpExecError("Can't add local storage to drbd")
3699

    
3700
      dev.children = new_lvs
3701
      cfg.Update(instance)
3702

    
3703
    # Step: wait for sync
3704

    
3705
    # this can fail as the old devices are degraded and _WaitForSync
3706
    # does a combined result over all disks, so we don't check its
3707
    # return value
3708
    self.proc.LogStep(5, steps_total, "sync devices")
3709
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3710

    
3711
    # so check manually all the devices
3712
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3713
      cfg.SetDiskID(dev, instance.primary_node)
3714
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3715
      if is_degr:
3716
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3717

    
3718
    # Step: remove old storage
3719
    self.proc.LogStep(6, steps_total, "removing old storage")
3720
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3721
      info("remove logical volumes for %s" % name)
3722
      for lv in old_lvs:
3723
        cfg.SetDiskID(lv, tgt_node)
3724
        if not rpc.call_blockdev_remove(tgt_node, lv):
3725
          warning("Can't remove old LV", "manually remove unused LVs")
3726
          continue
3727

    
3728
  def _ExecD8Secondary(self, feedback_fn):
3729
    """Replace the secondary node for drbd8.
3730

3731
    The algorithm for replace is quite complicated:
3732
      - for all disks of the instance:
3733
        - create new LVs on the new node with same names
3734
        - shutdown the drbd device on the old secondary
3735
        - disconnect the drbd network on the primary
3736
        - create the drbd device on the new secondary
3737
        - network attach the drbd on the primary, using an artifice:
3738
          the drbd code for Attach() will connect to the network if it
3739
          finds a device which is connected to the good local disks but
3740
          not network enabled
3741
      - wait for sync across all devices
3742
      - remove all disks from the old secondary
3743

3744
    Failures are not very well handled.
3745

3746
    """
3747
    steps_total = 6
3748
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3749
    instance = self.instance
3750
    iv_names = {}
3751
    vgname = self.cfg.GetVGName()
3752
    # start of work
3753
    cfg = self.cfg
3754
    old_node = self.tgt_node
3755
    new_node = self.new_node
3756
    pri_node = instance.primary_node
3757

    
3758
    # Step: check device activation
3759
    self.proc.LogStep(1, steps_total, "check device existence")
3760
    info("checking volume groups")
3761
    my_vg = cfg.GetVGName()
3762
    results = rpc.call_vg_list([pri_node, new_node])
3763
    if not results:
3764
      raise errors.OpExecError("Can't list volume groups on the nodes")
3765
    for node in pri_node, new_node:
3766
      res = results.get(node, False)
3767
      if not res or my_vg not in res:
3768
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3769
                                 (my_vg, node))
3770
    for dev in instance.disks:
3771
      if not dev.iv_name in self.op.disks:
3772
        continue
3773
      info("checking %s on %s" % (dev.iv_name, pri_node))
3774
      cfg.SetDiskID(dev, pri_node)
3775
      if not rpc.call_blockdev_find(pri_node, dev):
3776
        raise errors.OpExecError("Can't find device %s on node %s" %
3777
                                 (dev.iv_name, pri_node))
3778

    
3779
    # Step: check other node consistency
3780
    self.proc.LogStep(2, steps_total, "check peer consistency")
3781
    for dev in instance.disks:
3782
      if not dev.iv_name in self.op.disks:
3783
        continue
3784
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3785
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3786
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3787
                                 " unsafe to replace the secondary" %
3788
                                 pri_node)
3789

    
3790
    # Step: create new storage
3791
    self.proc.LogStep(3, steps_total, "allocate new storage")
3792
    for dev in instance.disks:
3793
      size = dev.size
3794
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3795
      # since we *always* want to create this LV, we use the
3796
      # _Create...OnPrimary (which forces the creation), even if we
3797
      # are talking about the secondary node
3798
      for new_lv in dev.children:
3799
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3800
                                        _GetInstanceInfoText(instance)):
3801
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3802
                                   " node '%s'" %
3803
                                   (new_lv.logical_id[1], new_node))
3804

    
3805
      iv_names[dev.iv_name] = (dev, dev.children)
3806

    
3807
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3808
    for dev in instance.disks:
3809
      size = dev.size
3810
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3811
      # create new devices on new_node
3812
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3813
                              logical_id=(pri_node, new_node,
3814
                                          dev.logical_id[2]),
3815
                              children=dev.children)
3816
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3817
                                        new_drbd, False,
3818
                                      _GetInstanceInfoText(instance)):
3819
        raise errors.OpExecError("Failed to create new DRBD on"
3820
                                 " node '%s'" % new_node)
3821

    
3822
    for dev in instance.disks:
3823
      # we have new devices, shutdown the drbd on the old secondary
3824
      info("shutting down drbd for %s on old node" % dev.iv_name)
3825
      cfg.SetDiskID(dev, old_node)
3826
      if not rpc.call_blockdev_shutdown(old_node, dev):
3827
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3828
                "Please cleanup this device manually as soon as possible")
3829

    
3830
    info("detaching primary drbds from the network (=> standalone)")
3831
    done = 0
3832
    for dev in instance.disks:
3833
      cfg.SetDiskID(dev, pri_node)
3834
      # set the physical (unique in bdev terms) id to None, meaning
3835
      # detach from network
3836
      dev.physical_id = (None,) * len(dev.physical_id)
3837
      # and 'find' the device, which will 'fix' it to match the
3838
      # standalone state
3839
      if rpc.call_blockdev_find(pri_node, dev):
3840
        done += 1
3841
      else:
3842
        warning("Failed to detach drbd %s from network, unusual case" %
3843
                dev.iv_name)
3844

    
3845
    if not done:
3846
      # no detaches succeeded (very unlikely)
3847
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3848

    
3849
    # if we managed to detach at least one, we update all the disks of
3850
    # the instance to point to the new secondary
3851
    info("updating instance configuration")
3852
    for dev in instance.disks:
3853
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3854
      cfg.SetDiskID(dev, pri_node)
3855
    cfg.Update(instance)
3856

    
3857
    # and now perform the drbd attach
3858
    info("attaching primary drbds to new secondary (standalone => connected)")
3859
    failures = []
3860
    for dev in instance.disks:
3861
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3862
      # since the attach is smart, it's enough to 'find' the device,
3863
      # it will automatically activate the network, if the physical_id
3864
      # is correct
3865
      cfg.SetDiskID(dev, pri_node)
3866
      if not rpc.call_blockdev_find(pri_node, dev):
3867
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3868
                "please do a gnt-instance info to see the status of disks")
3869

    
3870
    # this can fail as the old devices are degraded and _WaitForSync
3871
    # does a combined result over all disks, so we don't check its
3872
    # return value
3873
    self.proc.LogStep(5, steps_total, "sync devices")
3874
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3875

    
3876
    # so check manually all the devices
3877
    for name, (dev, old_lvs) in iv_names.iteritems():
3878
      cfg.SetDiskID(dev, pri_node)
3879
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3880
      if is_degr:
3881
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3882

    
3883
    self.proc.LogStep(6, steps_total, "removing old storage")
3884
    for name, (dev, old_lvs) in iv_names.iteritems():
3885
      info("remove logical volumes for %s" % name)
3886
      for lv in old_lvs:
3887
        cfg.SetDiskID(lv, old_node)
3888
        if not rpc.call_blockdev_remove(old_node, lv):
3889
          warning("Can't remove LV on old secondary",
3890
                  "Cleanup stale volumes by hand")
3891

    
3892
  def Exec(self, feedback_fn):
3893
    """Execute disk replacement.
3894

3895
    This dispatches the disk replacement to the appropriate handler.
3896

3897
    """
3898
    instance = self.instance
3899
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3900
      fn = self._ExecRR1
3901
    elif instance.disk_template == constants.DT_DRBD8:
3902
      if self.op.remote_node is None:
3903
        fn = self._ExecD8DiskOnly
3904
      else:
3905
        fn = self._ExecD8Secondary
3906
    else:
3907
      raise errors.ProgrammerError("Unhandled disk replacement case")
3908
    return fn(feedback_fn)
3909

    
3910

    
3911
class LUQueryInstanceData(NoHooksLU):
3912
  """Query runtime instance data.
3913

3914
  """
3915
  _OP_REQP = ["instances"]
3916

    
3917
  def CheckPrereq(self):
3918
    """Check prerequisites.
3919

3920
    This only checks the optional instance list against the existing names.
3921

3922
    """
3923
    if not isinstance(self.op.instances, list):
3924
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3925
    if self.op.instances:
3926
      self.wanted_instances = []
3927
      names = self.op.instances
3928
      for name in names:
3929
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3930
        if instance is None:
3931
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3932
      self.wanted_instances.append(instance)
3933
    else:
3934
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3935
                               in self.cfg.GetInstanceList()]
3936
    return
3937

    
3938

    
3939
  def _ComputeDiskStatus(self, instance, snode, dev):
3940
    """Compute block device status.
3941

3942
    """
3943
    self.cfg.SetDiskID(dev, instance.primary_node)
3944
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3945
    if dev.dev_type in constants.LDS_DRBD:
3946
      # we change the snode then (otherwise we use the one passed in)
3947
      if dev.logical_id[0] == instance.primary_node:
3948
        snode = dev.logical_id[1]
3949
      else:
3950
        snode = dev.logical_id[0]
3951

    
3952
    if snode:
3953
      self.cfg.SetDiskID(dev, snode)
3954
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3955
    else:
3956
      dev_sstatus = None
3957

    
3958
    if dev.children:
3959
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3960
                      for child in dev.children]
3961
    else:
3962
      dev_children = []
3963

    
3964
    data = {
3965
      "iv_name": dev.iv_name,
3966
      "dev_type": dev.dev_type,
3967
      "logical_id": dev.logical_id,
3968
      "physical_id": dev.physical_id,
3969
      "pstatus": dev_pstatus,
3970
      "sstatus": dev_sstatus,
3971
      "children": dev_children,
3972
      }
3973

    
3974
    return data
3975

    
3976
  def Exec(self, feedback_fn):
3977
    """Gather and return data"""
3978
    result = {}
3979
    for instance in self.wanted_instances:
3980
      remote_info = rpc.call_instance_info(instance.primary_node,
3981
                                                instance.name)
3982
      if remote_info and "state" in remote_info:
3983
        remote_state = "up"
3984
      else:
3985
        remote_state = "down"
3986
      if instance.status == "down":
3987
        config_state = "down"
3988
      else:
3989
        config_state = "up"
3990

    
3991
      disks = [self._ComputeDiskStatus(instance, None, device)
3992
               for device in instance.disks]
3993

    
3994
      idict = {
3995
        "name": instance.name,
3996
        "config_state": config_state,
3997
        "run_state": remote_state,
3998
        "pnode": instance.primary_node,
3999
        "snodes": instance.secondary_nodes,
4000
        "os": instance.os,
4001
        "memory": instance.memory,
4002
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4003
        "disks": disks,
4004
        "vcpus": instance.vcpus,
4005
        }
4006

    
4007
      result[instance.name] = idict
4008

    
4009
    return result
4010

    
4011

    
4012
class LUSetInstanceParms(LogicalUnit):
4013
  """Modifies an instances's parameters.
4014

4015
  """
4016
  HPATH = "instance-modify"
4017
  HTYPE = constants.HTYPE_INSTANCE
4018
  _OP_REQP = ["instance_name"]
4019

    
4020
  def BuildHooksEnv(self):
4021
    """Build hooks env.
4022

4023
    This runs on the master, primary and secondaries.
4024

4025
    """
4026
    args = dict()
4027
    if self.mem:
4028
      args['memory'] = self.mem
4029
    if self.vcpus:
4030
      args['vcpus'] = self.vcpus
4031
    if self.do_ip or self.do_bridge:
4032
      if self.do_ip:
4033
        ip = self.ip
4034
      else:
4035
        ip = self.instance.nics[0].ip
4036
      if self.bridge:
4037
        bridge = self.bridge
4038
      else:
4039
        bridge = self.instance.nics[0].bridge
4040
      args['nics'] = [(ip, bridge)]
4041
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4042
    nl = [self.sstore.GetMasterNode(),
4043
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4044
    return env, nl, nl
4045

    
4046
  def CheckPrereq(self):
4047
    """Check prerequisites.
4048

4049
    This only checks the instance list against the existing names.
4050

4051
    """
4052
    self.mem = getattr(self.op, "mem", None)
4053
    self.vcpus = getattr(self.op, "vcpus", None)
4054
    self.ip = getattr(self.op, "ip", None)
4055
    self.bridge = getattr(self.op, "bridge", None)
4056
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4057
      raise errors.OpPrereqError("No changes submitted")
4058
    if self.mem is not None:
4059
      try:
4060
        self.mem = int(self.mem)
4061
      except ValueError, err:
4062
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4063
    if self.vcpus is not None:
4064
      try:
4065
        self.vcpus = int(self.vcpus)
4066
      except ValueError, err:
4067
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4068
    if self.ip is not None:
4069
      self.do_ip = True
4070
      if self.ip.lower() == "none":
4071
        self.ip = None
4072
      else:
4073
        if not utils.IsValidIP(self.ip):
4074
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4075
    else:
4076
      self.do_ip = False
4077
    self.do_bridge = (self.bridge is not None)
4078

    
4079
    instance = self.cfg.GetInstanceInfo(
4080
      self.cfg.ExpandInstanceName(self.op.instance_name))
4081
    if instance is None:
4082
      raise errors.OpPrereqError("No such instance name '%s'" %
4083
                                 self.op.instance_name)
4084
    self.op.instance_name = instance.name
4085
    self.instance = instance
4086
    return
4087

    
4088
  def Exec(self, feedback_fn):
4089
    """Modifies an instance.
4090

4091
    All parameters take effect only at the next restart of the instance.
4092
    """
4093
    result = []
4094
    instance = self.instance
4095
    if self.mem:
4096
      instance.memory = self.mem
4097
      result.append(("mem", self.mem))
4098
    if self.vcpus:
4099
      instance.vcpus = self.vcpus
4100
      result.append(("vcpus",  self.vcpus))
4101
    if self.do_ip:
4102
      instance.nics[0].ip = self.ip
4103
      result.append(("ip", self.ip))
4104
    if self.bridge:
4105
      instance.nics[0].bridge = self.bridge
4106
      result.append(("bridge", self.bridge))
4107

    
4108
    self.cfg.AddInstance(instance)
4109

    
4110
    return result
4111

    
4112

    
4113
class LUQueryExports(NoHooksLU):
4114
  """Query the exports list
4115

4116
  """
4117
  _OP_REQP = []
4118

    
4119
  def CheckPrereq(self):
4120
    """Check that the nodelist contains only existing nodes.
4121

4122
    """
4123
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4124

    
4125
  def Exec(self, feedback_fn):
4126
    """Compute the list of all the exported system images.
4127

4128
    Returns:
4129
      a dictionary with the structure node->(export-list)
4130
      where export-list is a list of the instances exported on
4131
      that node.
4132

4133
    """
4134
    return rpc.call_export_list(self.nodes)
4135

    
4136

    
4137
class LUExportInstance(LogicalUnit):
4138
  """Export an instance to an image in the cluster.
4139

4140
  """
4141
  HPATH = "instance-export"
4142
  HTYPE = constants.HTYPE_INSTANCE
4143
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4144

    
4145
  def BuildHooksEnv(self):
4146
    """Build hooks env.
4147

4148
    This will run on the master, primary node and target node.
4149

4150
    """
4151
    env = {
4152
      "EXPORT_NODE": self.op.target_node,
4153
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4154
      }
4155
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4156
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4157
          self.op.target_node]
4158
    return env, nl, nl
4159

    
4160
  def CheckPrereq(self):
4161
    """Check prerequisites.
4162

4163
    This checks that the instance name is a valid one.
4164

4165
    """
4166
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4167
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4168
    if self.instance is None:
4169
      raise errors.OpPrereqError("Instance '%s' not found" %
4170
                                 self.op.instance_name)
4171

    
4172
    # node verification
4173
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4174
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4175

    
4176
    if self.dst_node is None:
4177
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4178
                                 self.op.target_node)
4179
    self.op.target_node = self.dst_node.name
4180

    
4181
  def Exec(self, feedback_fn):
4182
    """Export an instance to an image in the cluster.
4183

4184
    """
4185
    instance = self.instance
4186
    dst_node = self.dst_node
4187
    src_node = instance.primary_node
4188
    # shutdown the instance, unless requested not to do so
4189
    if self.op.shutdown:
4190
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4191
      self.proc.ChainOpCode(op)
4192

    
4193
    vgname = self.cfg.GetVGName()
4194

    
4195
    snap_disks = []
4196

    
4197
    try:
4198
      for disk in instance.disks:
4199
        if disk.iv_name == "sda":
4200
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4201
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4202

    
4203
          if not new_dev_name:
4204
            logger.Error("could not snapshot block device %s on node %s" %
4205
                         (disk.logical_id[1], src_node))
4206
          else:
4207
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4208
                                      logical_id=(vgname, new_dev_name),
4209
                                      physical_id=(vgname, new_dev_name),
4210
                                      iv_name=disk.iv_name)
4211
            snap_disks.append(new_dev)
4212

    
4213
    finally:
4214
      if self.op.shutdown:
4215
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4216
                                       force=False)
4217
        self.proc.ChainOpCode(op)
4218

    
4219
    # TODO: check for size
4220

    
4221
    for dev in snap_disks:
4222
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4223
                                           instance):
4224
        logger.Error("could not export block device %s from node"
4225
                     " %s to node %s" %
4226
                     (dev.logical_id[1], src_node, dst_node.name))
4227
      if not rpc.call_blockdev_remove(src_node, dev):
4228
        logger.Error("could not remove snapshot block device %s from"
4229
                     " node %s" % (dev.logical_id[1], src_node))
4230

    
4231
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4232
      logger.Error("could not finalize export for instance %s on node %s" %
4233
                   (instance.name, dst_node.name))
4234

    
4235
    nodelist = self.cfg.GetNodeList()
4236
    nodelist.remove(dst_node.name)
4237

    
4238
    # on one-node clusters nodelist will be empty after the removal
4239
    # if we proceed the backup would be removed because OpQueryExports
4240
    # substitutes an empty list with the full cluster node list.
4241
    if nodelist:
4242
      op = opcodes.OpQueryExports(nodes=nodelist)
4243
      exportlist = self.proc.ChainOpCode(op)
4244
      for node in exportlist:
4245
        if instance.name in exportlist[node]:
4246
          if not rpc.call_export_remove(node, instance.name):
4247
            logger.Error("could not remove older export for instance %s"
4248
                         " on node %s" % (instance.name, node))
4249

    
4250

    
4251
class TagsLU(NoHooksLU):
4252
  """Generic tags LU.
4253

4254
  This is an abstract class which is the parent of all the other tags LUs.
4255

4256
  """
4257
  def CheckPrereq(self):
4258
    """Check prerequisites.
4259

4260
    """
4261
    if self.op.kind == constants.TAG_CLUSTER:
4262
      self.target = self.cfg.GetClusterInfo()
4263
    elif self.op.kind == constants.TAG_NODE:
4264
      name = self.cfg.ExpandNodeName(self.op.name)
4265
      if name is None:
4266
        raise errors.OpPrereqError("Invalid node name (%s)" %
4267
                                   (self.op.name,))
4268
      self.op.name = name
4269
      self.target = self.cfg.GetNodeInfo(name)
4270
    elif self.op.kind == constants.TAG_INSTANCE:
4271
      name = self.cfg.ExpandInstanceName(self.op.name)
4272
      if name is None:
4273
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4274
                                   (self.op.name,))
4275
      self.op.name = name
4276
      self.target = self.cfg.GetInstanceInfo(name)
4277
    else:
4278
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4279
                                 str(self.op.kind))
4280

    
4281

    
4282
class LUGetTags(TagsLU):
4283
  """Returns the tags of a given object.
4284

4285
  """
4286
  _OP_REQP = ["kind", "name"]
4287

    
4288
  def Exec(self, feedback_fn):
4289
    """Returns the tag list.
4290

4291
    """
4292
    return self.target.GetTags()
4293

    
4294

    
4295
class LUSearchTags(NoHooksLU):
4296
  """Searches the tags for a given pattern.
4297

4298
  """
4299
  _OP_REQP = ["pattern"]
4300

    
4301
  def CheckPrereq(self):
4302
    """Check prerequisites.
4303

4304
    This checks the pattern passed for validity by compiling it.
4305

4306
    """
4307
    try:
4308
      self.re = re.compile(self.op.pattern)
4309
    except re.error, err:
4310
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4311
                                 (self.op.pattern, err))
4312

    
4313
  def Exec(self, feedback_fn):
4314
    """Returns the tag list.
4315

4316
    """
4317
    cfg = self.cfg
4318
    tgts = [("/cluster", cfg.GetClusterInfo())]
4319
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4320
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4321
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4322
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4323
    results = []
4324
    for path, target in tgts:
4325
      for tag in target.GetTags():
4326
        if self.re.search(tag):
4327
          results.append((path, tag))
4328
    return results
4329

    
4330

    
4331
class LUAddTags(TagsLU):
4332
  """Sets a tag on a given object.
4333

4334
  """
4335
  _OP_REQP = ["kind", "name", "tags"]
4336

    
4337
  def CheckPrereq(self):
4338
    """Check prerequisites.
4339

4340
    This checks the type and length of the tag name and value.
4341

4342
    """
4343
    TagsLU.CheckPrereq(self)
4344
    for tag in self.op.tags:
4345
      objects.TaggableObject.ValidateTag(tag)
4346

    
4347
  def Exec(self, feedback_fn):
4348
    """Sets the tag.
4349

4350
    """
4351
    try:
4352
      for tag in self.op.tags:
4353
        self.target.AddTag(tag)
4354
    except errors.TagError, err:
4355
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4356
    try:
4357
      self.cfg.Update(self.target)
4358
    except errors.ConfigurationError:
4359
      raise errors.OpRetryError("There has been a modification to the"
4360
                                " config file and the operation has been"
4361
                                " aborted. Please retry.")
4362

    
4363

    
4364
class LUDelTags(TagsLU):
4365
  """Delete a list of tags from a given object.
4366

4367
  """
4368
  _OP_REQP = ["kind", "name", "tags"]
4369

    
4370
  def CheckPrereq(self):
4371
    """Check prerequisites.
4372

4373
    This checks that we have the given tag.
4374

4375
    """
4376
    TagsLU.CheckPrereq(self)
4377
    for tag in self.op.tags:
4378
      objects.TaggableObject.ValidateTag(tag)
4379
    del_tags = frozenset(self.op.tags)
4380
    cur_tags = self.target.GetTags()
4381
    if not del_tags <= cur_tags:
4382
      diff_tags = del_tags - cur_tags
4383
      diff_names = ["'%s'" % tag for tag in diff_tags]
4384
      diff_names.sort()
4385
      raise errors.OpPrereqError("Tag(s) %s not found" %
4386
                                 (",".join(diff_names)))
4387

    
4388
  def Exec(self, feedback_fn):
4389
    """Remove the tag from the object.
4390

4391
    """
4392
    for tag in self.op.tags:
4393
      self.target.RemoveTag(tag)
4394
    try:
4395
      self.cfg.Update(self.target)
4396
    except errors.ConfigurationError:
4397
      raise errors.OpRetryError("There has been a modification to the"
4398
                                " config file and the operation has been"
4399
                                " aborted. Please retry.")