Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 2584d4a4

History | View | Annotate | Download (145.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    self.hostname = hostname = utils.HostInfo()
503

    
504
    if hostname.ip.startswith("127."):
505
      raise errors.OpPrereqError("This host's IP resolves to the private"
506
                                 " range (%s). Please fix DNS or /etc/hosts." %
507
                                 (hostname.ip,))
508

    
509
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510

    
511
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512
                         constants.DEFAULT_NODED_PORT):
513
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514
                                 " to %s,\nbut this ip address does not"
515
                                 " belong to this host."
516
                                 " Aborting." % hostname.ip)
517

    
518
    secondary_ip = getattr(self.op, "secondary_ip", None)
519
    if secondary_ip and not utils.IsValidIP(secondary_ip):
520
      raise errors.OpPrereqError("Invalid secondary ip given")
521
    if (secondary_ip and
522
        secondary_ip != hostname.ip and
523
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524
                           constants.DEFAULT_NODED_PORT))):
525
      raise errors.OpPrereqError("You gave %s as secondary IP,"
526
                                 " but it does not belong to this host." %
527
                                 secondary_ip)
528
    self.secondary_ip = secondary_ip
529

    
530
    # checks presence of the volume group given
531
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532

    
533
    if vgstatus:
534
      raise errors.OpPrereqError("Error: %s" % vgstatus)
535

    
536
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537
                    self.op.mac_prefix):
538
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539
                                 self.op.mac_prefix)
540

    
541
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
542
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543
                                 self.op.hypervisor_type)
544

    
545
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546
    if result.failed:
547
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548
                                 (self.op.master_netdev,
549
                                  result.output.strip()))
550

    
551
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
554
                                 " executable." % constants.NODE_INITD_SCRIPT)
555

    
556
  def Exec(self, feedback_fn):
557
    """Initialize the cluster.
558

559
    """
560
    clustername = self.clustername
561
    hostname = self.hostname
562

    
563
    # set up the simple store
564
    self.sstore = ss = ssconf.SimpleStore()
565
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570

    
571
    # set up the inter-node password and certificate
572
    _InitGanetiServerSetup(ss)
573

    
574
    # start the master ip
575
    rpc.call_node_start_master(hostname.name)
576

    
577
    # set up ssh config and /etc/hosts
578
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
579
    try:
580
      sshline = f.read()
581
    finally:
582
      f.close()
583
    sshkey = sshline.split(" ")[1]
584

    
585
    _AddHostToEtcHosts(hostname.name)
586

    
587
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588

    
589
    _InitSSHSetup(hostname.name)
590

    
591
    # init of cluster config file
592
    self.cfg = cfgw = config.ConfigWriter()
593
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594
                    sshkey, self.op.mac_prefix,
595
                    self.op.vg_name, self.op.def_bridge)
596

    
597

    
598
class LUDestroyCluster(NoHooksLU):
599
  """Logical unit for destroying the cluster.
600

601
  """
602
  _OP_REQP = []
603

    
604
  def CheckPrereq(self):
605
    """Check prerequisites.
606

607
    This checks whether the cluster is empty.
608

609
    Any errors are signalled by raising errors.OpPrereqError.
610

611
    """
612
    master = self.sstore.GetMasterNode()
613

    
614
    nodelist = self.cfg.GetNodeList()
615
    if len(nodelist) != 1 or nodelist[0] != master:
616
      raise errors.OpPrereqError("There are still %d node(s) in"
617
                                 " this cluster." % (len(nodelist) - 1))
618
    instancelist = self.cfg.GetInstanceList()
619
    if instancelist:
620
      raise errors.OpPrereqError("There are still %d instance(s) in"
621
                                 " this cluster." % len(instancelist))
622

    
623
  def Exec(self, feedback_fn):
624
    """Destroys the cluster.
625

626
    """
627
    master = self.sstore.GetMasterNode()
628
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629
    utils.CreateBackup(priv_key)
630
    utils.CreateBackup(pub_key)
631
    rpc.call_node_leave_cluster(master)
632

    
633

    
634
class LUVerifyCluster(NoHooksLU):
635
  """Verifies the cluster status.
636

637
  """
638
  _OP_REQP = []
639

    
640
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641
                  remote_version, feedback_fn):
642
    """Run multiple tests against a node.
643

644
    Test list:
645
      - compares ganeti version
646
      - checks vg existance and size > 20G
647
      - checks config file checksum
648
      - checks ssh to other nodes
649

650
    Args:
651
      node: name of the node to check
652
      file_list: required list of files
653
      local_cksum: dictionary of local files and their checksums
654

655
    """
656
    # compares ganeti version
657
    local_version = constants.PROTOCOL_VERSION
658
    if not remote_version:
659
      feedback_fn(" - ERROR: connection to %s failed" % (node))
660
      return True
661

    
662
    if local_version != remote_version:
663
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664
                      (local_version, node, remote_version))
665
      return True
666

    
667
    # checks vg existance and size > 20G
668

    
669
    bad = False
670
    if not vglist:
671
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672
                      (node,))
673
      bad = True
674
    else:
675
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676
      if vgstatus:
677
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678
        bad = True
679

    
680
    # checks config file checksum
681
    # checks ssh to any
682

    
683
    if 'filelist' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
686
    else:
687
      remote_cksum = node_result['filelist']
688
      for file_name in file_list:
689
        if file_name not in remote_cksum:
690
          bad = True
691
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
692
        elif remote_cksum[file_name] != local_cksum[file_name]:
693
          bad = True
694
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695

    
696
    if 'nodelist' not in node_result:
697
      bad = True
698
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699
    else:
700
      if node_result['nodelist']:
701
        bad = True
702
        for node in node_result['nodelist']:
703
          feedback_fn("  - ERROR: communication with node '%s': %s" %
704
                          (node, node_result['nodelist'][node]))
705
    hyp_result = node_result.get('hypervisor', None)
706
    if hyp_result is not None:
707
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708
    return bad
709

    
710
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711
    """Verify an instance.
712

713
    This function checks to see if the required block devices are
714
    available on the instance's node.
715

716
    """
717
    bad = False
718

    
719
    instancelist = self.cfg.GetInstanceList()
720
    if not instance in instancelist:
721
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
722
                      (instance, instancelist))
723
      bad = True
724

    
725
    instanceconfig = self.cfg.GetInstanceInfo(instance)
726
    node_current = instanceconfig.primary_node
727

    
728
    node_vol_should = {}
729
    instanceconfig.MapLVsByNode(node_vol_should)
730

    
731
    for node in node_vol_should:
732
      for volume in node_vol_should[node]:
733
        if node not in node_vol_is or volume not in node_vol_is[node]:
734
          feedback_fn("  - ERROR: volume %s missing on node %s" %
735
                          (volume, node))
736
          bad = True
737

    
738
    if not instanceconfig.status == 'down':
739
      if not instance in node_instance[node_current]:
740
        feedback_fn("  - ERROR: instance %s not running on node %s" %
741
                        (instance, node_current))
742
        bad = True
743

    
744
    for node in node_instance:
745
      if (not node == node_current):
746
        if instance in node_instance[node]:
747
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
748
                          (instance, node))
749
          bad = True
750

    
751
    return bad
752

    
753
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754
    """Verify if there are any unknown volumes in the cluster.
755

756
    The .os, .swap and backup volumes are ignored. All other volumes are
757
    reported as unknown.
758

759
    """
760
    bad = False
761

    
762
    for node in node_vol_is:
763
      for volume in node_vol_is[node]:
764
        if node not in node_vol_should or volume not in node_vol_should[node]:
765
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766
                      (volume, node))
767
          bad = True
768
    return bad
769

    
770
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771
    """Verify the list of running instances.
772

773
    This checks what instances are running but unknown to the cluster.
774

775
    """
776
    bad = False
777
    for node in node_instance:
778
      for runninginstance in node_instance[node]:
779
        if runninginstance not in instancelist:
780
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781
                          (runninginstance, node))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    This has no prerequisites.
789

790
    """
791
    pass
792

    
793
  def Exec(self, feedback_fn):
794
    """Verify integrity of cluster, performing various test on nodes.
795

796
    """
797
    bad = False
798
    feedback_fn("* Verifying global settings")
799
    for msg in self.cfg.VerifyConfig():
800
      feedback_fn("  - ERROR: %s" % msg)
801

    
802
    vg_name = self.cfg.GetVGName()
803
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
804
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805
    node_volume = {}
806
    node_instance = {}
807

    
808
    # FIXME: verify OS list
809
    # do local checksums
810
    file_names = list(self.sstore.GetFileList())
811
    file_names.append(constants.SSL_CERT_FILE)
812
    file_names.append(constants.CLUSTER_CONF_FILE)
813
    local_checksums = utils.FingerprintFiles(file_names)
814

    
815
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817
    all_instanceinfo = rpc.call_instance_list(nodelist)
818
    all_vglist = rpc.call_vg_list(nodelist)
819
    node_verify_param = {
820
      'filelist': file_names,
821
      'nodelist': nodelist,
822
      'hypervisor': None,
823
      }
824
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825
    all_rversion = rpc.call_version(nodelist)
826

    
827
    for node in nodelist:
828
      feedback_fn("* Verifying node %s" % node)
829
      result = self._VerifyNode(node, file_names, local_checksums,
830
                                all_vglist[node], all_nvinfo[node],
831
                                all_rversion[node], feedback_fn)
832
      bad = bad or result
833

    
834
      # node_volume
835
      volumeinfo = all_volumeinfo[node]
836

    
837
      if type(volumeinfo) != dict:
838
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
839
        bad = True
840
        continue
841

    
842
      node_volume[node] = volumeinfo
843

    
844
      # node_instance
845
      nodeinstance = all_instanceinfo[node]
846
      if type(nodeinstance) != list:
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850

    
851
      node_instance[node] = nodeinstance
852

    
853
    node_vol_should = {}
854

    
855
    for instance in instancelist:
856
      feedback_fn("* Verifying instance %s" % instance)
857
      result =  self._VerifyInstance(instance, node_volume, node_instance,
858
                                     feedback_fn)
859
      bad = bad or result
860

    
861
      inst_config = self.cfg.GetInstanceInfo(instance)
862

    
863
      inst_config.MapLVsByNode(node_vol_should)
864

    
865
    feedback_fn("* Verifying orphan volumes")
866
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867
                                       feedback_fn)
868
    bad = bad or result
869

    
870
    feedback_fn("* Verifying remaining instances")
871
    result = self._VerifyOrphanInstances(instancelist, node_instance,
872
                                         feedback_fn)
873
    bad = bad or result
874

    
875
    return int(bad)
876

    
877

    
878
class LUVerifyDisks(NoHooksLU):
879
  """Verifies the cluster disks status.
880

881
  """
882
  _OP_REQP = []
883

    
884
  def CheckPrereq(self):
885
    """Check prerequisites.
886

887
    This has no prerequisites.
888

889
    """
890
    pass
891

    
892
  def Exec(self, feedback_fn):
893
    """Verify integrity of cluster disks.
894

895
    """
896
    result = res_nodes, res_instances = [], []
897

    
898
    vg_name = self.cfg.GetVGName()
899
    nodes = utils.NiceSort(self.cfg.GetNodeList())
900
    instances = [self.cfg.GetInstanceInfo(name)
901
                 for name in self.cfg.GetInstanceList()]
902

    
903
    nv_dict = {}
904
    for inst in instances:
905
      inst_lvs = {}
906
      if (inst.status != "up" or
907
          inst.disk_template not in constants.DTS_NET_MIRROR):
908
        continue
909
      inst.MapLVsByNode(inst_lvs)
910
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
911
      for node, vol_list in inst_lvs.iteritems():
912
        for vol in vol_list:
913
          nv_dict[(node, vol)] = inst
914

    
915
    if not nv_dict:
916
      return result
917

    
918
    node_lvs = rpc.call_volume_list(nodes, vg_name)
919

    
920
    to_act = set()
921
    for node in nodes:
922
      # node_volume
923
      lvs = node_lvs[node]
924

    
925
      if not isinstance(lvs, dict):
926
        logger.Info("connection to node %s failed or invalid data returned" %
927
                    (node,))
928
        res_nodes.append(node)
929
        continue
930

    
931
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
932
        if not lv_online:
933
          inst = nv_dict.get((node, lv_name), None)
934
          if inst is not None and inst.name not in res_instances:
935
            res_instances.append(inst.name)
936

    
937
    return result
938

    
939

    
940
class LURenameCluster(LogicalUnit):
941
  """Rename the cluster.
942

943
  """
944
  HPATH = "cluster-rename"
945
  HTYPE = constants.HTYPE_CLUSTER
946
  _OP_REQP = ["name"]
947

    
948
  def BuildHooksEnv(self):
949
    """Build hooks env.
950

951
    """
952
    env = {
953
      "OP_TARGET": self.op.sstore.GetClusterName(),
954
      "NEW_NAME": self.op.name,
955
      }
956
    mn = self.sstore.GetMasterNode()
957
    return env, [mn], [mn]
958

    
959
  def CheckPrereq(self):
960
    """Verify that the passed name is a valid one.
961

962
    """
963
    hostname = utils.HostInfo(self.op.name)
964

    
965
    new_name = hostname.name
966
    self.ip = new_ip = hostname.ip
967
    old_name = self.sstore.GetClusterName()
968
    old_ip = self.sstore.GetMasterIP()
969
    if new_name == old_name and new_ip == old_ip:
970
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
971
                                 " cluster has changed")
972
    if new_ip != old_ip:
973
      result = utils.RunCmd(["fping", "-q", new_ip])
974
      if not result.failed:
975
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
976
                                   " reachable on the network. Aborting." %
977
                                   new_ip)
978

    
979
    self.op.name = new_name
980

    
981
  def Exec(self, feedback_fn):
982
    """Rename the cluster.
983

984
    """
985
    clustername = self.op.name
986
    ip = self.ip
987
    ss = self.sstore
988

    
989
    # shutdown the master IP
990
    master = ss.GetMasterNode()
991
    if not rpc.call_node_stop_master(master):
992
      raise errors.OpExecError("Could not disable the master role")
993

    
994
    try:
995
      # modify the sstore
996
      ss.SetKey(ss.SS_MASTER_IP, ip)
997
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998

    
999
      # Distribute updated ss config to all nodes
1000
      myself = self.cfg.GetNodeInfo(master)
1001
      dist_nodes = self.cfg.GetNodeList()
1002
      if myself.name in dist_nodes:
1003
        dist_nodes.remove(myself.name)
1004

    
1005
      logger.Debug("Copying updated ssconf data to all nodes")
1006
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007
        fname = ss.KeyToFilename(keyname)
1008
        result = rpc.call_upload_file(dist_nodes, fname)
1009
        for to_node in dist_nodes:
1010
          if not result[to_node]:
1011
            logger.Error("copy of file %s to node %s failed" %
1012
                         (fname, to_node))
1013
    finally:
1014
      if not rpc.call_node_start_master(master):
1015
        logger.Error("Could not re-enable the master role on the master,"
1016
                     " please restart manually.")
1017

    
1018

    
1019
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1020
  """Sleep and poll for an instance's disk to sync.
1021

1022
  """
1023
  if not instance.disks:
1024
    return True
1025

    
1026
  if not oneshot:
1027
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1028

    
1029
  node = instance.primary_node
1030

    
1031
  for dev in instance.disks:
1032
    cfgw.SetDiskID(dev, node)
1033

    
1034
  retries = 0
1035
  while True:
1036
    max_time = 0
1037
    done = True
1038
    cumul_degraded = False
1039
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040
    if not rstats:
1041
      proc.LogWarning("Can't get any data from node %s" % node)
1042
      retries += 1
1043
      if retries >= 10:
1044
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1045
                                 " aborting." % node)
1046
      time.sleep(6)
1047
      continue
1048
    retries = 0
1049
    for i in range(len(rstats)):
1050
      mstat = rstats[i]
1051
      if mstat is None:
1052
        proc.LogWarning("Can't compute data for node %s/%s" %
1053
                        (node, instance.disks[i].iv_name))
1054
        continue
1055
      # we ignore the ldisk parameter
1056
      perc_done, est_time, is_degraded, _ = mstat
1057
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1058
      if perc_done is not None:
1059
        done = False
1060
        if est_time is not None:
1061
          rem_time = "%d estimated seconds remaining" % est_time
1062
          max_time = est_time
1063
        else:
1064
          rem_time = "no time estimate"
1065
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1066
                     (instance.disks[i].iv_name, perc_done, rem_time))
1067
    if done or oneshot:
1068
      break
1069

    
1070
    if unlock:
1071
      utils.Unlock('cmd')
1072
    try:
1073
      time.sleep(min(60, max_time))
1074
    finally:
1075
      if unlock:
1076
        utils.Lock('cmd')
1077

    
1078
  if done:
1079
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1080
  return not cumul_degraded
1081

    
1082

    
1083
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1084
  """Check that mirrors are not degraded.
1085

1086
  The ldisk parameter, if True, will change the test from the
1087
  is_degraded attribute (which represents overall non-ok status for
1088
  the device(s)) to the ldisk (representing the local storage status).
1089

1090
  """
1091
  cfgw.SetDiskID(dev, node)
1092
  if ldisk:
1093
    idx = 6
1094
  else:
1095
    idx = 5
1096

    
1097
  result = True
1098
  if on_primary or dev.AssembleOnSecondary():
1099
    rstats = rpc.call_blockdev_find(node, dev)
1100
    if not rstats:
1101
      logger.ToStderr("Can't get any data from node %s" % node)
1102
      result = False
1103
    else:
1104
      result = result and (not rstats[idx])
1105
  if dev.children:
1106
    for child in dev.children:
1107
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1108

    
1109
  return result
1110

    
1111

    
1112
class LUDiagnoseOS(NoHooksLU):
1113
  """Logical unit for OS diagnose/query.
1114

1115
  """
1116
  _OP_REQP = []
1117

    
1118
  def CheckPrereq(self):
1119
    """Check prerequisites.
1120

1121
    This always succeeds, since this is a pure query LU.
1122

1123
    """
1124
    return
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Compute the list of OSes.
1128

1129
    """
1130
    node_list = self.cfg.GetNodeList()
1131
    node_data = rpc.call_os_diagnose(node_list)
1132
    if node_data == False:
1133
      raise errors.OpExecError("Can't gather the list of OSes")
1134
    return node_data
1135

    
1136

    
1137
class LURemoveNode(LogicalUnit):
1138
  """Logical unit for removing a node.
1139

1140
  """
1141
  HPATH = "node-remove"
1142
  HTYPE = constants.HTYPE_NODE
1143
  _OP_REQP = ["node_name"]
1144

    
1145
  def BuildHooksEnv(self):
1146
    """Build hooks env.
1147

1148
    This doesn't run on the target node in the pre phase as a failed
1149
    node would not allows itself to run.
1150

1151
    """
1152
    env = {
1153
      "OP_TARGET": self.op.node_name,
1154
      "NODE_NAME": self.op.node_name,
1155
      }
1156
    all_nodes = self.cfg.GetNodeList()
1157
    all_nodes.remove(self.op.node_name)
1158
    return env, all_nodes, all_nodes
1159

    
1160
  def CheckPrereq(self):
1161
    """Check prerequisites.
1162

1163
    This checks:
1164
     - the node exists in the configuration
1165
     - it does not have primary or secondary instances
1166
     - it's not the master
1167

1168
    Any errors are signalled by raising errors.OpPrereqError.
1169

1170
    """
1171
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1172
    if node is None:
1173
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1174

    
1175
    instance_list = self.cfg.GetInstanceList()
1176

    
1177
    masternode = self.sstore.GetMasterNode()
1178
    if node.name == masternode:
1179
      raise errors.OpPrereqError("Node is the master node,"
1180
                                 " you need to failover first.")
1181

    
1182
    for instance_name in instance_list:
1183
      instance = self.cfg.GetInstanceInfo(instance_name)
1184
      if node.name == instance.primary_node:
1185
        raise errors.OpPrereqError("Instance %s still running on the node,"
1186
                                   " please remove first." % instance_name)
1187
      if node.name in instance.secondary_nodes:
1188
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1189
                                   " please remove first." % instance_name)
1190
    self.op.node_name = node.name
1191
    self.node = node
1192

    
1193
  def Exec(self, feedback_fn):
1194
    """Removes the node from the cluster.
1195

1196
    """
1197
    node = self.node
1198
    logger.Info("stopping the node daemon and removing configs from node %s" %
1199
                node.name)
1200

    
1201
    rpc.call_node_leave_cluster(node.name)
1202

    
1203
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1204

    
1205
    logger.Info("Removing node %s from config" % node.name)
1206

    
1207
    self.cfg.RemoveNode(node.name)
1208

    
1209
    _RemoveHostFromEtcHosts(node.name)
1210

    
1211

    
1212
class LUQueryNodes(NoHooksLU):
1213
  """Logical unit for querying nodes.
1214

1215
  """
1216
  _OP_REQP = ["output_fields", "names"]
1217

    
1218
  def CheckPrereq(self):
1219
    """Check prerequisites.
1220

1221
    This checks that the fields required are valid output fields.
1222

1223
    """
1224
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1225
                                     "mtotal", "mnode", "mfree",
1226
                                     "bootid"])
1227

    
1228
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1229
                               "pinst_list", "sinst_list",
1230
                               "pip", "sip"],
1231
                       dynamic=self.dynamic_fields,
1232
                       selected=self.op.output_fields)
1233

    
1234
    self.wanted = _GetWantedNodes(self, self.op.names)
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Computes the list of nodes and their attributes.
1238

1239
    """
1240
    nodenames = self.wanted
1241
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1242

    
1243
    # begin data gathering
1244

    
1245
    if self.dynamic_fields.intersection(self.op.output_fields):
1246
      live_data = {}
1247
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1248
      for name in nodenames:
1249
        nodeinfo = node_data.get(name, None)
1250
        if nodeinfo:
1251
          live_data[name] = {
1252
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1253
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1254
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1255
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1256
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1257
            "bootid": nodeinfo['bootid'],
1258
            }
1259
        else:
1260
          live_data[name] = {}
1261
    else:
1262
      live_data = dict.fromkeys(nodenames, {})
1263

    
1264
    node_to_primary = dict([(name, set()) for name in nodenames])
1265
    node_to_secondary = dict([(name, set()) for name in nodenames])
1266

    
1267
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1268
                             "sinst_cnt", "sinst_list"))
1269
    if inst_fields & frozenset(self.op.output_fields):
1270
      instancelist = self.cfg.GetInstanceList()
1271

    
1272
      for instance_name in instancelist:
1273
        inst = self.cfg.GetInstanceInfo(instance_name)
1274
        if inst.primary_node in node_to_primary:
1275
          node_to_primary[inst.primary_node].add(inst.name)
1276
        for secnode in inst.secondary_nodes:
1277
          if secnode in node_to_secondary:
1278
            node_to_secondary[secnode].add(inst.name)
1279

    
1280
    # end data gathering
1281

    
1282
    output = []
1283
    for node in nodelist:
1284
      node_output = []
1285
      for field in self.op.output_fields:
1286
        if field == "name":
1287
          val = node.name
1288
        elif field == "pinst_list":
1289
          val = list(node_to_primary[node.name])
1290
        elif field == "sinst_list":
1291
          val = list(node_to_secondary[node.name])
1292
        elif field == "pinst_cnt":
1293
          val = len(node_to_primary[node.name])
1294
        elif field == "sinst_cnt":
1295
          val = len(node_to_secondary[node.name])
1296
        elif field == "pip":
1297
          val = node.primary_ip
1298
        elif field == "sip":
1299
          val = node.secondary_ip
1300
        elif field in self.dynamic_fields:
1301
          val = live_data[node.name].get(field, None)
1302
        else:
1303
          raise errors.ParameterError(field)
1304
        node_output.append(val)
1305
      output.append(node_output)
1306

    
1307
    return output
1308

    
1309

    
1310
class LUQueryNodeVolumes(NoHooksLU):
1311
  """Logical unit for getting volumes on node(s).
1312

1313
  """
1314
  _OP_REQP = ["nodes", "output_fields"]
1315

    
1316
  def CheckPrereq(self):
1317
    """Check prerequisites.
1318

1319
    This checks that the fields required are valid output fields.
1320

1321
    """
1322
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1323

    
1324
    _CheckOutputFields(static=["node"],
1325
                       dynamic=["phys", "vg", "name", "size", "instance"],
1326
                       selected=self.op.output_fields)
1327

    
1328

    
1329
  def Exec(self, feedback_fn):
1330
    """Computes the list of nodes and their attributes.
1331

1332
    """
1333
    nodenames = self.nodes
1334
    volumes = rpc.call_node_volumes(nodenames)
1335

    
1336
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1337
             in self.cfg.GetInstanceList()]
1338

    
1339
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1340

    
1341
    output = []
1342
    for node in nodenames:
1343
      if node not in volumes or not volumes[node]:
1344
        continue
1345

    
1346
      node_vols = volumes[node][:]
1347
      node_vols.sort(key=lambda vol: vol['dev'])
1348

    
1349
      for vol in node_vols:
1350
        node_output = []
1351
        for field in self.op.output_fields:
1352
          if field == "node":
1353
            val = node
1354
          elif field == "phys":
1355
            val = vol['dev']
1356
          elif field == "vg":
1357
            val = vol['vg']
1358
          elif field == "name":
1359
            val = vol['name']
1360
          elif field == "size":
1361
            val = int(float(vol['size']))
1362
          elif field == "instance":
1363
            for inst in ilist:
1364
              if node not in lv_by_node[inst]:
1365
                continue
1366
              if vol['name'] in lv_by_node[inst][node]:
1367
                val = inst.name
1368
                break
1369
            else:
1370
              val = '-'
1371
          else:
1372
            raise errors.ParameterError(field)
1373
          node_output.append(str(val))
1374

    
1375
        output.append(node_output)
1376

    
1377
    return output
1378

    
1379

    
1380
class LUAddNode(LogicalUnit):
1381
  """Logical unit for adding node to the cluster.
1382

1383
  """
1384
  HPATH = "node-add"
1385
  HTYPE = constants.HTYPE_NODE
1386
  _OP_REQP = ["node_name"]
1387

    
1388
  def BuildHooksEnv(self):
1389
    """Build hooks env.
1390

1391
    This will run on all nodes before, and on all nodes + the new node after.
1392

1393
    """
1394
    env = {
1395
      "OP_TARGET": self.op.node_name,
1396
      "NODE_NAME": self.op.node_name,
1397
      "NODE_PIP": self.op.primary_ip,
1398
      "NODE_SIP": self.op.secondary_ip,
1399
      }
1400
    nodes_0 = self.cfg.GetNodeList()
1401
    nodes_1 = nodes_0 + [self.op.node_name, ]
1402
    return env, nodes_0, nodes_1
1403

    
1404
  def CheckPrereq(self):
1405
    """Check prerequisites.
1406

1407
    This checks:
1408
     - the new node is not already in the config
1409
     - it is resolvable
1410
     - its parameters (single/dual homed) matches the cluster
1411

1412
    Any errors are signalled by raising errors.OpPrereqError.
1413

1414
    """
1415
    node_name = self.op.node_name
1416
    cfg = self.cfg
1417

    
1418
    dns_data = utils.HostInfo(node_name)
1419

    
1420
    node = dns_data.name
1421
    primary_ip = self.op.primary_ip = dns_data.ip
1422
    secondary_ip = getattr(self.op, "secondary_ip", None)
1423
    if secondary_ip is None:
1424
      secondary_ip = primary_ip
1425
    if not utils.IsValidIP(secondary_ip):
1426
      raise errors.OpPrereqError("Invalid secondary IP given")
1427
    self.op.secondary_ip = secondary_ip
1428
    node_list = cfg.GetNodeList()
1429
    if node in node_list:
1430
      raise errors.OpPrereqError("Node %s is already in the configuration"
1431
                                 % node)
1432

    
1433
    for existing_node_name in node_list:
1434
      existing_node = cfg.GetNodeInfo(existing_node_name)
1435
      if (existing_node.primary_ip == primary_ip or
1436
          existing_node.secondary_ip == primary_ip or
1437
          existing_node.primary_ip == secondary_ip or
1438
          existing_node.secondary_ip == secondary_ip):
1439
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1440
                                   " existing node %s" % existing_node.name)
1441

    
1442
    # check that the type of the node (single versus dual homed) is the
1443
    # same as for the master
1444
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1445
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1446
    newbie_singlehomed = secondary_ip == primary_ip
1447
    if master_singlehomed != newbie_singlehomed:
1448
      if master_singlehomed:
1449
        raise errors.OpPrereqError("The master has no private ip but the"
1450
                                   " new node has one")
1451
      else:
1452
        raise errors.OpPrereqError("The master has a private ip but the"
1453
                                   " new node doesn't have one")
1454

    
1455
    # checks reachablity
1456
    if not utils.TcpPing(utils.HostInfo().name,
1457
                         primary_ip,
1458
                         constants.DEFAULT_NODED_PORT):
1459
      raise errors.OpPrereqError("Node not reachable by ping")
1460

    
1461
    if not newbie_singlehomed:
1462
      # check reachability from my secondary ip to newbie's secondary ip
1463
      if not utils.TcpPing(myself.secondary_ip,
1464
                           secondary_ip,
1465
                           constants.DEFAULT_NODED_PORT):
1466
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1467
                                   " based ping to noded port")
1468

    
1469
    self.new_node = objects.Node(name=node,
1470
                                 primary_ip=primary_ip,
1471
                                 secondary_ip=secondary_ip)
1472

    
1473
  def Exec(self, feedback_fn):
1474
    """Adds the new node to the cluster.
1475

1476
    """
1477
    new_node = self.new_node
1478
    node = new_node.name
1479

    
1480
    # set up inter-node password and certificate and restarts the node daemon
1481
    gntpass = self.sstore.GetNodeDaemonPassword()
1482
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1483
      raise errors.OpExecError("ganeti password corruption detected")
1484
    f = open(constants.SSL_CERT_FILE)
1485
    try:
1486
      gntpem = f.read(8192)
1487
    finally:
1488
      f.close()
1489
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1490
    # so we use this to detect an invalid certificate; as long as the
1491
    # cert doesn't contain this, the here-document will be correctly
1492
    # parsed by the shell sequence below
1493
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1494
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1495
    if not gntpem.endswith("\n"):
1496
      raise errors.OpExecError("PEM must end with newline")
1497
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1498

    
1499
    # and then connect with ssh to set password and start ganeti-noded
1500
    # note that all the below variables are sanitized at this point,
1501
    # either by being constants or by the checks above
1502
    ss = self.sstore
1503
    mycommand = ("umask 077 && "
1504
                 "echo '%s' > '%s' && "
1505
                 "cat > '%s' << '!EOF.' && \n"
1506
                 "%s!EOF.\n%s restart" %
1507
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1508
                  constants.SSL_CERT_FILE, gntpem,
1509
                  constants.NODE_INITD_SCRIPT))
1510

    
1511
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1512
    if result.failed:
1513
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1514
                               " output: %s" %
1515
                               (node, result.fail_reason, result.output))
1516

    
1517
    # check connectivity
1518
    time.sleep(4)
1519

    
1520
    result = rpc.call_version([node])[node]
1521
    if result:
1522
      if constants.PROTOCOL_VERSION == result:
1523
        logger.Info("communication to node %s fine, sw version %s match" %
1524
                    (node, result))
1525
      else:
1526
        raise errors.OpExecError("Version mismatch master version %s,"
1527
                                 " node version %s" %
1528
                                 (constants.PROTOCOL_VERSION, result))
1529
    else:
1530
      raise errors.OpExecError("Cannot get version from the new node")
1531

    
1532
    # setup ssh on node
1533
    logger.Info("copy ssh key to node %s" % node)
1534
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1535
    keyarray = []
1536
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1537
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1538
                priv_key, pub_key]
1539

    
1540
    for i in keyfiles:
1541
      f = open(i, 'r')
1542
      try:
1543
        keyarray.append(f.read())
1544
      finally:
1545
        f.close()
1546

    
1547
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1548
                               keyarray[3], keyarray[4], keyarray[5])
1549

    
1550
    if not result:
1551
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1552

    
1553
    # Add node to our /etc/hosts, and add key to known_hosts
1554
    _AddHostToEtcHosts(new_node.name)
1555

    
1556
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1557
                      self.cfg.GetHostKey())
1558

    
1559
    if new_node.secondary_ip != new_node.primary_ip:
1560
      if not rpc.call_node_tcp_ping(new_node.name,
1561
                                    constants.LOCALHOST_IP_ADDRESS,
1562
                                    new_node.secondary_ip,
1563
                                    constants.DEFAULT_NODED_PORT,
1564
                                    10, False):
1565
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1566
                                 " you gave (%s). Please fix and re-run this"
1567
                                 " command." % new_node.secondary_ip)
1568

    
1569
    success, msg = ssh.VerifyNodeHostname(node)
1570
    if not success:
1571
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1572
                               " than the one the resolver gives: %s."
1573
                               " Please fix and re-run this command." %
1574
                               (node, msg))
1575

    
1576
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1577
    # including the node just added
1578
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1579
    dist_nodes = self.cfg.GetNodeList() + [node]
1580
    if myself.name in dist_nodes:
1581
      dist_nodes.remove(myself.name)
1582

    
1583
    logger.Debug("Copying hosts and known_hosts to all nodes")
1584
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1585
      result = rpc.call_upload_file(dist_nodes, fname)
1586
      for to_node in dist_nodes:
1587
        if not result[to_node]:
1588
          logger.Error("copy of file %s to node %s failed" %
1589
                       (fname, to_node))
1590

    
1591
    to_copy = ss.GetFileList()
1592
    for fname in to_copy:
1593
      if not ssh.CopyFileToNode(node, fname):
1594
        logger.Error("could not copy file %s to node %s" % (fname, node))
1595

    
1596
    logger.Info("adding node %s to cluster.conf" % node)
1597
    self.cfg.AddNode(new_node)
1598

    
1599

    
1600
class LUMasterFailover(LogicalUnit):
1601
  """Failover the master node to the current node.
1602

1603
  This is a special LU in that it must run on a non-master node.
1604

1605
  """
1606
  HPATH = "master-failover"
1607
  HTYPE = constants.HTYPE_CLUSTER
1608
  REQ_MASTER = False
1609
  _OP_REQP = []
1610

    
1611
  def BuildHooksEnv(self):
1612
    """Build hooks env.
1613

1614
    This will run on the new master only in the pre phase, and on all
1615
    the nodes in the post phase.
1616

1617
    """
1618
    env = {
1619
      "OP_TARGET": self.new_master,
1620
      "NEW_MASTER": self.new_master,
1621
      "OLD_MASTER": self.old_master,
1622
      }
1623
    return env, [self.new_master], self.cfg.GetNodeList()
1624

    
1625
  def CheckPrereq(self):
1626
    """Check prerequisites.
1627

1628
    This checks that we are not already the master.
1629

1630
    """
1631
    self.new_master = utils.HostInfo().name
1632
    self.old_master = self.sstore.GetMasterNode()
1633

    
1634
    if self.old_master == self.new_master:
1635
      raise errors.OpPrereqError("This commands must be run on the node"
1636
                                 " where you want the new master to be."
1637
                                 " %s is already the master" %
1638
                                 self.old_master)
1639

    
1640
  def Exec(self, feedback_fn):
1641
    """Failover the master node.
1642

1643
    This command, when run on a non-master node, will cause the current
1644
    master to cease being master, and the non-master to become new
1645
    master.
1646

1647
    """
1648
    #TODO: do not rely on gethostname returning the FQDN
1649
    logger.Info("setting master to %s, old master: %s" %
1650
                (self.new_master, self.old_master))
1651

    
1652
    if not rpc.call_node_stop_master(self.old_master):
1653
      logger.Error("could disable the master role on the old master"
1654
                   " %s, please disable manually" % self.old_master)
1655

    
1656
    ss = self.sstore
1657
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1658
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1659
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1660
      logger.Error("could not distribute the new simple store master file"
1661
                   " to the other nodes, please check.")
1662

    
1663
    if not rpc.call_node_start_master(self.new_master):
1664
      logger.Error("could not start the master role on the new master"
1665
                   " %s, please check" % self.new_master)
1666
      feedback_fn("Error in activating the master IP on the new master,"
1667
                  " please fix manually.")
1668

    
1669

    
1670

    
1671
class LUQueryClusterInfo(NoHooksLU):
1672
  """Query cluster configuration.
1673

1674
  """
1675
  _OP_REQP = []
1676
  REQ_MASTER = False
1677

    
1678
  def CheckPrereq(self):
1679
    """No prerequsites needed for this LU.
1680

1681
    """
1682
    pass
1683

    
1684
  def Exec(self, feedback_fn):
1685
    """Return cluster config.
1686

1687
    """
1688
    result = {
1689
      "name": self.sstore.GetClusterName(),
1690
      "software_version": constants.RELEASE_VERSION,
1691
      "protocol_version": constants.PROTOCOL_VERSION,
1692
      "config_version": constants.CONFIG_VERSION,
1693
      "os_api_version": constants.OS_API_VERSION,
1694
      "export_version": constants.EXPORT_VERSION,
1695
      "master": self.sstore.GetMasterNode(),
1696
      "architecture": (platform.architecture()[0], platform.machine()),
1697
      }
1698

    
1699
    return result
1700

    
1701

    
1702
class LUClusterCopyFile(NoHooksLU):
1703
  """Copy file to cluster.
1704

1705
  """
1706
  _OP_REQP = ["nodes", "filename"]
1707

    
1708
  def CheckPrereq(self):
1709
    """Check prerequisites.
1710

1711
    It should check that the named file exists and that the given list
1712
    of nodes is valid.
1713

1714
    """
1715
    if not os.path.exists(self.op.filename):
1716
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1717

    
1718
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1719

    
1720
  def Exec(self, feedback_fn):
1721
    """Copy a file from master to some nodes.
1722

1723
    Args:
1724
      opts - class with options as members
1725
      args - list containing a single element, the file name
1726
    Opts used:
1727
      nodes - list containing the name of target nodes; if empty, all nodes
1728

1729
    """
1730
    filename = self.op.filename
1731

    
1732
    myname = utils.HostInfo().name
1733

    
1734
    for node in self.nodes:
1735
      if node == myname:
1736
        continue
1737
      if not ssh.CopyFileToNode(node, filename):
1738
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1739

    
1740

    
1741
class LUDumpClusterConfig(NoHooksLU):
1742
  """Return a text-representation of the cluster-config.
1743

1744
  """
1745
  _OP_REQP = []
1746

    
1747
  def CheckPrereq(self):
1748
    """No prerequisites.
1749

1750
    """
1751
    pass
1752

    
1753
  def Exec(self, feedback_fn):
1754
    """Dump a representation of the cluster config to the standard output.
1755

1756
    """
1757
    return self.cfg.DumpConfig()
1758

    
1759

    
1760
class LURunClusterCommand(NoHooksLU):
1761
  """Run a command on some nodes.
1762

1763
  """
1764
  _OP_REQP = ["command", "nodes"]
1765

    
1766
  def CheckPrereq(self):
1767
    """Check prerequisites.
1768

1769
    It checks that the given list of nodes is valid.
1770

1771
    """
1772
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1773

    
1774
  def Exec(self, feedback_fn):
1775
    """Run a command on some nodes.
1776

1777
    """
1778
    data = []
1779
    for node in self.nodes:
1780
      result = ssh.SSHCall(node, "root", self.op.command)
1781
      data.append((node, result.output, result.exit_code))
1782

    
1783
    return data
1784

    
1785

    
1786
class LUActivateInstanceDisks(NoHooksLU):
1787
  """Bring up an instance's disks.
1788

1789
  """
1790
  _OP_REQP = ["instance_name"]
1791

    
1792
  def CheckPrereq(self):
1793
    """Check prerequisites.
1794

1795
    This checks that the instance is in the cluster.
1796

1797
    """
1798
    instance = self.cfg.GetInstanceInfo(
1799
      self.cfg.ExpandInstanceName(self.op.instance_name))
1800
    if instance is None:
1801
      raise errors.OpPrereqError("Instance '%s' not known" %
1802
                                 self.op.instance_name)
1803
    self.instance = instance
1804

    
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Activate the disks.
1808

1809
    """
1810
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1811
    if not disks_ok:
1812
      raise errors.OpExecError("Cannot activate block devices")
1813

    
1814
    return disks_info
1815

    
1816

    
1817
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1818
  """Prepare the block devices for an instance.
1819

1820
  This sets up the block devices on all nodes.
1821

1822
  Args:
1823
    instance: a ganeti.objects.Instance object
1824
    ignore_secondaries: if true, errors on secondary nodes won't result
1825
                        in an error return from the function
1826

1827
  Returns:
1828
    false if the operation failed
1829
    list of (host, instance_visible_name, node_visible_name) if the operation
1830
         suceeded with the mapping from node devices to instance devices
1831
  """
1832
  device_info = []
1833
  disks_ok = True
1834
  for inst_disk in instance.disks:
1835
    master_result = None
1836
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1837
      cfg.SetDiskID(node_disk, node)
1838
      is_primary = node == instance.primary_node
1839
      result = rpc.call_blockdev_assemble(node, node_disk,
1840
                                          instance.name, is_primary)
1841
      if not result:
1842
        logger.Error("could not prepare block device %s on node %s"
1843
                     " (is_primary=%s)" %
1844
                     (inst_disk.iv_name, node, is_primary))
1845
        if is_primary or not ignore_secondaries:
1846
          disks_ok = False
1847
      if is_primary:
1848
        master_result = result
1849
    device_info.append((instance.primary_node, inst_disk.iv_name,
1850
                        master_result))
1851

    
1852
  # leave the disks configured for the primary node
1853
  # this is a workaround that would be fixed better by
1854
  # improving the logical/physical id handling
1855
  for disk in instance.disks:
1856
    cfg.SetDiskID(disk, instance.primary_node)
1857

    
1858
  return disks_ok, device_info
1859

    
1860

    
1861
def _StartInstanceDisks(cfg, instance, force):
1862
  """Start the disks of an instance.
1863

1864
  """
1865
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1866
                                           ignore_secondaries=force)
1867
  if not disks_ok:
1868
    _ShutdownInstanceDisks(instance, cfg)
1869
    if force is not None and not force:
1870
      logger.Error("If the message above refers to a secondary node,"
1871
                   " you can retry the operation using '--force'.")
1872
    raise errors.OpExecError("Disk consistency error")
1873

    
1874

    
1875
class LUDeactivateInstanceDisks(NoHooksLU):
1876
  """Shutdown an instance's disks.
1877

1878
  """
1879
  _OP_REQP = ["instance_name"]
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    instance = self.cfg.GetInstanceInfo(
1888
      self.cfg.ExpandInstanceName(self.op.instance_name))
1889
    if instance is None:
1890
      raise errors.OpPrereqError("Instance '%s' not known" %
1891
                                 self.op.instance_name)
1892
    self.instance = instance
1893

    
1894
  def Exec(self, feedback_fn):
1895
    """Deactivate the disks
1896

1897
    """
1898
    instance = self.instance
1899
    ins_l = rpc.call_instance_list([instance.primary_node])
1900
    ins_l = ins_l[instance.primary_node]
1901
    if not type(ins_l) is list:
1902
      raise errors.OpExecError("Can't contact node '%s'" %
1903
                               instance.primary_node)
1904

    
1905
    if self.instance.name in ins_l:
1906
      raise errors.OpExecError("Instance is running, can't shutdown"
1907
                               " block devices.")
1908

    
1909
    _ShutdownInstanceDisks(instance, self.cfg)
1910

    
1911

    
1912
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1913
  """Shutdown block devices of an instance.
1914

1915
  This does the shutdown on all nodes of the instance.
1916

1917
  If the ignore_primary is false, errors on the primary node are
1918
  ignored.
1919

1920
  """
1921
  result = True
1922
  for disk in instance.disks:
1923
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1924
      cfg.SetDiskID(top_disk, node)
1925
      if not rpc.call_blockdev_shutdown(node, top_disk):
1926
        logger.Error("could not shutdown block device %s on node %s" %
1927
                     (disk.iv_name, node))
1928
        if not ignore_primary or node != instance.primary_node:
1929
          result = False
1930
  return result
1931

    
1932

    
1933
class LUStartupInstance(LogicalUnit):
1934
  """Starts an instance.
1935

1936
  """
1937
  HPATH = "instance-start"
1938
  HTYPE = constants.HTYPE_INSTANCE
1939
  _OP_REQP = ["instance_name", "force"]
1940

    
1941
  def BuildHooksEnv(self):
1942
    """Build hooks env.
1943

1944
    This runs on master, primary and secondary nodes of the instance.
1945

1946
    """
1947
    env = {
1948
      "FORCE": self.op.force,
1949
      }
1950
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1951
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1952
          list(self.instance.secondary_nodes))
1953
    return env, nl, nl
1954

    
1955
  def CheckPrereq(self):
1956
    """Check prerequisites.
1957

1958
    This checks that the instance is in the cluster.
1959

1960
    """
1961
    instance = self.cfg.GetInstanceInfo(
1962
      self.cfg.ExpandInstanceName(self.op.instance_name))
1963
    if instance is None:
1964
      raise errors.OpPrereqError("Instance '%s' not known" %
1965
                                 self.op.instance_name)
1966

    
1967
    # check bridges existance
1968
    _CheckInstanceBridgesExist(instance)
1969

    
1970
    self.instance = instance
1971
    self.op.instance_name = instance.name
1972

    
1973
  def Exec(self, feedback_fn):
1974
    """Start the instance.
1975

1976
    """
1977
    instance = self.instance
1978
    force = self.op.force
1979
    extra_args = getattr(self.op, "extra_args", "")
1980

    
1981
    node_current = instance.primary_node
1982

    
1983
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1984
    if not nodeinfo:
1985
      raise errors.OpExecError("Could not contact node %s for infos" %
1986
                               (node_current))
1987

    
1988
    freememory = nodeinfo[node_current]['memory_free']
1989
    memory = instance.memory
1990
    if memory > freememory:
1991
      raise errors.OpExecError("Not enough memory to start instance"
1992
                               " %s on node %s"
1993
                               " needed %s MiB, available %s MiB" %
1994
                               (instance.name, node_current, memory,
1995
                                freememory))
1996

    
1997
    _StartInstanceDisks(self.cfg, instance, force)
1998

    
1999
    if not rpc.call_instance_start(node_current, instance, extra_args):
2000
      _ShutdownInstanceDisks(instance, self.cfg)
2001
      raise errors.OpExecError("Could not start instance")
2002

    
2003
    self.cfg.MarkInstanceUp(instance.name)
2004

    
2005

    
2006
class LURebootInstance(LogicalUnit):
2007
  """Reboot an instance.
2008

2009
  """
2010
  HPATH = "instance-reboot"
2011
  HTYPE = constants.HTYPE_INSTANCE
2012
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2013

    
2014
  def BuildHooksEnv(self):
2015
    """Build hooks env.
2016

2017
    This runs on master, primary and secondary nodes of the instance.
2018

2019
    """
2020
    env = {
2021
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2022
      }
2023
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2024
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2025
          list(self.instance.secondary_nodes))
2026
    return env, nl, nl
2027

    
2028
  def CheckPrereq(self):
2029
    """Check prerequisites.
2030

2031
    This checks that the instance is in the cluster.
2032

2033
    """
2034
    instance = self.cfg.GetInstanceInfo(
2035
      self.cfg.ExpandInstanceName(self.op.instance_name))
2036
    if instance is None:
2037
      raise errors.OpPrereqError("Instance '%s' not known" %
2038
                                 self.op.instance_name)
2039

    
2040
    # check bridges existance
2041
    _CheckInstanceBridgesExist(instance)
2042

    
2043
    self.instance = instance
2044
    self.op.instance_name = instance.name
2045

    
2046
  def Exec(self, feedback_fn):
2047
    """Reboot the instance.
2048

2049
    """
2050
    instance = self.instance
2051
    ignore_secondaries = self.op.ignore_secondaries
2052
    reboot_type = self.op.reboot_type
2053
    extra_args = getattr(self.op, "extra_args", "")
2054

    
2055
    node_current = instance.primary_node
2056

    
2057
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2058
                           constants.INSTANCE_REBOOT_HARD,
2059
                           constants.INSTANCE_REBOOT_FULL]:
2060
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2061
                                  (constants.INSTANCE_REBOOT_SOFT,
2062
                                   constants.INSTANCE_REBOOT_HARD,
2063
                                   constants.INSTANCE_REBOOT_FULL))
2064

    
2065
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2066
                       constants.INSTANCE_REBOOT_HARD]:
2067
      if not rpc.call_instance_reboot(node_current, instance,
2068
                                      reboot_type, extra_args):
2069
        raise errors.OpExecError("Could not reboot instance")
2070
    else:
2071
      if not rpc.call_instance_shutdown(node_current, instance):
2072
        raise errors.OpExecError("could not shutdown instance for full reboot")
2073
      _ShutdownInstanceDisks(instance, self.cfg)
2074
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2075
      if not rpc.call_instance_start(node_current, instance, extra_args):
2076
        _ShutdownInstanceDisks(instance, self.cfg)
2077
        raise errors.OpExecError("Could not start instance for full reboot")
2078

    
2079
    self.cfg.MarkInstanceUp(instance.name)
2080

    
2081

    
2082
class LUShutdownInstance(LogicalUnit):
2083
  """Shutdown an instance.
2084

2085
  """
2086
  HPATH = "instance-stop"
2087
  HTYPE = constants.HTYPE_INSTANCE
2088
  _OP_REQP = ["instance_name"]
2089

    
2090
  def BuildHooksEnv(self):
2091
    """Build hooks env.
2092

2093
    This runs on master, primary and secondary nodes of the instance.
2094

2095
    """
2096
    env = _BuildInstanceHookEnvByObject(self.instance)
2097
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2098
          list(self.instance.secondary_nodes))
2099
    return env, nl, nl
2100

    
2101
  def CheckPrereq(self):
2102
    """Check prerequisites.
2103

2104
    This checks that the instance is in the cluster.
2105

2106
    """
2107
    instance = self.cfg.GetInstanceInfo(
2108
      self.cfg.ExpandInstanceName(self.op.instance_name))
2109
    if instance is None:
2110
      raise errors.OpPrereqError("Instance '%s' not known" %
2111
                                 self.op.instance_name)
2112
    self.instance = instance
2113

    
2114
  def Exec(self, feedback_fn):
2115
    """Shutdown the instance.
2116

2117
    """
2118
    instance = self.instance
2119
    node_current = instance.primary_node
2120
    if not rpc.call_instance_shutdown(node_current, instance):
2121
      logger.Error("could not shutdown instance")
2122

    
2123
    self.cfg.MarkInstanceDown(instance.name)
2124
    _ShutdownInstanceDisks(instance, self.cfg)
2125

    
2126

    
2127
class LUReinstallInstance(LogicalUnit):
2128
  """Reinstall an instance.
2129

2130
  """
2131
  HPATH = "instance-reinstall"
2132
  HTYPE = constants.HTYPE_INSTANCE
2133
  _OP_REQP = ["instance_name"]
2134

    
2135
  def BuildHooksEnv(self):
2136
    """Build hooks env.
2137

2138
    This runs on master, primary and secondary nodes of the instance.
2139

2140
    """
2141
    env = _BuildInstanceHookEnvByObject(self.instance)
2142
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2143
          list(self.instance.secondary_nodes))
2144
    return env, nl, nl
2145

    
2146
  def CheckPrereq(self):
2147
    """Check prerequisites.
2148

2149
    This checks that the instance is in the cluster and is not running.
2150

2151
    """
2152
    instance = self.cfg.GetInstanceInfo(
2153
      self.cfg.ExpandInstanceName(self.op.instance_name))
2154
    if instance is None:
2155
      raise errors.OpPrereqError("Instance '%s' not known" %
2156
                                 self.op.instance_name)
2157
    if instance.disk_template == constants.DT_DISKLESS:
2158
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2159
                                 self.op.instance_name)
2160
    if instance.status != "down":
2161
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2162
                                 self.op.instance_name)
2163
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2164
    if remote_info:
2165
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2166
                                 (self.op.instance_name,
2167
                                  instance.primary_node))
2168

    
2169
    self.op.os_type = getattr(self.op, "os_type", None)
2170
    if self.op.os_type is not None:
2171
      # OS verification
2172
      pnode = self.cfg.GetNodeInfo(
2173
        self.cfg.ExpandNodeName(instance.primary_node))
2174
      if pnode is None:
2175
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2176
                                   self.op.pnode)
2177
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2178
      if not os_obj:
2179
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2180
                                   " primary node"  % self.op.os_type)
2181

    
2182
    self.instance = instance
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Reinstall the instance.
2186

2187
    """
2188
    inst = self.instance
2189

    
2190
    if self.op.os_type is not None:
2191
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2192
      inst.os = self.op.os_type
2193
      self.cfg.AddInstance(inst)
2194

    
2195
    _StartInstanceDisks(self.cfg, inst, None)
2196
    try:
2197
      feedback_fn("Running the instance OS create scripts...")
2198
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2199
        raise errors.OpExecError("Could not install OS for instance %s"
2200
                                 " on node %s" %
2201
                                 (inst.name, inst.primary_node))
2202
    finally:
2203
      _ShutdownInstanceDisks(inst, self.cfg)
2204

    
2205

    
2206
class LURenameInstance(LogicalUnit):
2207
  """Rename an instance.
2208

2209
  """
2210
  HPATH = "instance-rename"
2211
  HTYPE = constants.HTYPE_INSTANCE
2212
  _OP_REQP = ["instance_name", "new_name"]
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = _BuildInstanceHookEnvByObject(self.instance)
2221
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster and is not running.
2230

2231
    """
2232
    instance = self.cfg.GetInstanceInfo(
2233
      self.cfg.ExpandInstanceName(self.op.instance_name))
2234
    if instance is None:
2235
      raise errors.OpPrereqError("Instance '%s' not known" %
2236
                                 self.op.instance_name)
2237
    if instance.status != "down":
2238
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2239
                                 self.op.instance_name)
2240
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2241
    if remote_info:
2242
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2243
                                 (self.op.instance_name,
2244
                                  instance.primary_node))
2245
    self.instance = instance
2246

    
2247
    # new name verification
2248
    name_info = utils.HostInfo(self.op.new_name)
2249

    
2250
    self.op.new_name = new_name = name_info.name
2251
    if not getattr(self.op, "ignore_ip", False):
2252
      command = ["fping", "-q", name_info.ip]
2253
      result = utils.RunCmd(command)
2254
      if not result.failed:
2255
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2256
                                   (name_info.ip, new_name))
2257

    
2258

    
2259
  def Exec(self, feedback_fn):
2260
    """Reinstall the instance.
2261

2262
    """
2263
    inst = self.instance
2264
    old_name = inst.name
2265

    
2266
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2267

    
2268
    # re-read the instance from the configuration after rename
2269
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2270

    
2271
    _StartInstanceDisks(self.cfg, inst, None)
2272
    try:
2273
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2274
                                          "sda", "sdb"):
2275
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2276
               " instance has been renamed in Ganeti)" %
2277
               (inst.name, inst.primary_node))
2278
        logger.Error(msg)
2279
    finally:
2280
      _ShutdownInstanceDisks(inst, self.cfg)
2281

    
2282

    
2283
class LURemoveInstance(LogicalUnit):
2284
  """Remove an instance.
2285

2286
  """
2287
  HPATH = "instance-remove"
2288
  HTYPE = constants.HTYPE_INSTANCE
2289
  _OP_REQP = ["instance_name"]
2290

    
2291
  def BuildHooksEnv(self):
2292
    """Build hooks env.
2293

2294
    This runs on master, primary and secondary nodes of the instance.
2295

2296
    """
2297
    env = _BuildInstanceHookEnvByObject(self.instance)
2298
    nl = [self.sstore.GetMasterNode()]
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    instance = self.cfg.GetInstanceInfo(
2308
      self.cfg.ExpandInstanceName(self.op.instance_name))
2309
    if instance is None:
2310
      raise errors.OpPrereqError("Instance '%s' not known" %
2311
                                 self.op.instance_name)
2312
    self.instance = instance
2313

    
2314
  def Exec(self, feedback_fn):
2315
    """Remove the instance.
2316

2317
    """
2318
    instance = self.instance
2319
    logger.Info("shutting down instance %s on node %s" %
2320
                (instance.name, instance.primary_node))
2321

    
2322
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2323
      if self.op.ignore_failures:
2324
        feedback_fn("Warning: can't shutdown instance")
2325
      else:
2326
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2327
                                 (instance.name, instance.primary_node))
2328

    
2329
    logger.Info("removing block devices for instance %s" % instance.name)
2330

    
2331
    if not _RemoveDisks(instance, self.cfg):
2332
      if self.op.ignore_failures:
2333
        feedback_fn("Warning: can't remove instance's disks")
2334
      else:
2335
        raise errors.OpExecError("Can't remove instance's disks")
2336

    
2337
    logger.Info("removing instance %s out of cluster config" % instance.name)
2338

    
2339
    self.cfg.RemoveInstance(instance.name)
2340

    
2341

    
2342
class LUQueryInstances(NoHooksLU):
2343
  """Logical unit for querying instances.
2344

2345
  """
2346
  _OP_REQP = ["output_fields", "names"]
2347

    
2348
  def CheckPrereq(self):
2349
    """Check prerequisites.
2350

2351
    This checks that the fields required are valid output fields.
2352

2353
    """
2354
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2355
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2356
                               "admin_state", "admin_ram",
2357
                               "disk_template", "ip", "mac", "bridge",
2358
                               "sda_size", "sdb_size"],
2359
                       dynamic=self.dynamic_fields,
2360
                       selected=self.op.output_fields)
2361

    
2362
    self.wanted = _GetWantedInstances(self, self.op.names)
2363

    
2364
  def Exec(self, feedback_fn):
2365
    """Computes the list of nodes and their attributes.
2366

2367
    """
2368
    instance_names = self.wanted
2369
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2370
                     in instance_names]
2371

    
2372
    # begin data gathering
2373

    
2374
    nodes = frozenset([inst.primary_node for inst in instance_list])
2375

    
2376
    bad_nodes = []
2377
    if self.dynamic_fields.intersection(self.op.output_fields):
2378
      live_data = {}
2379
      node_data = rpc.call_all_instances_info(nodes)
2380
      for name in nodes:
2381
        result = node_data[name]
2382
        if result:
2383
          live_data.update(result)
2384
        elif result == False:
2385
          bad_nodes.append(name)
2386
        # else no instance is alive
2387
    else:
2388
      live_data = dict([(name, {}) for name in instance_names])
2389

    
2390
    # end data gathering
2391

    
2392
    output = []
2393
    for instance in instance_list:
2394
      iout = []
2395
      for field in self.op.output_fields:
2396
        if field == "name":
2397
          val = instance.name
2398
        elif field == "os":
2399
          val = instance.os
2400
        elif field == "pnode":
2401
          val = instance.primary_node
2402
        elif field == "snodes":
2403
          val = list(instance.secondary_nodes)
2404
        elif field == "admin_state":
2405
          val = (instance.status != "down")
2406
        elif field == "oper_state":
2407
          if instance.primary_node in bad_nodes:
2408
            val = None
2409
          else:
2410
            val = bool(live_data.get(instance.name))
2411
        elif field == "admin_ram":
2412
          val = instance.memory
2413
        elif field == "oper_ram":
2414
          if instance.primary_node in bad_nodes:
2415
            val = None
2416
          elif instance.name in live_data:
2417
            val = live_data[instance.name].get("memory", "?")
2418
          else:
2419
            val = "-"
2420
        elif field == "disk_template":
2421
          val = instance.disk_template
2422
        elif field == "ip":
2423
          val = instance.nics[0].ip
2424
        elif field == "bridge":
2425
          val = instance.nics[0].bridge
2426
        elif field == "mac":
2427
          val = instance.nics[0].mac
2428
        elif field == "sda_size" or field == "sdb_size":
2429
          disk = instance.FindDisk(field[:3])
2430
          if disk is None:
2431
            val = None
2432
          else:
2433
            val = disk.size
2434
        else:
2435
          raise errors.ParameterError(field)
2436
        iout.append(val)
2437
      output.append(iout)
2438

    
2439
    return output
2440

    
2441

    
2442
class LUFailoverInstance(LogicalUnit):
2443
  """Failover an instance.
2444

2445
  """
2446
  HPATH = "instance-failover"
2447
  HTYPE = constants.HTYPE_INSTANCE
2448
  _OP_REQP = ["instance_name", "ignore_consistency"]
2449

    
2450
  def BuildHooksEnv(self):
2451
    """Build hooks env.
2452

2453
    This runs on master, primary and secondary nodes of the instance.
2454

2455
    """
2456
    env = {
2457
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2458
      }
2459
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2460
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2461
    return env, nl, nl
2462

    
2463
  def CheckPrereq(self):
2464
    """Check prerequisites.
2465

2466
    This checks that the instance is in the cluster.
2467

2468
    """
2469
    instance = self.cfg.GetInstanceInfo(
2470
      self.cfg.ExpandInstanceName(self.op.instance_name))
2471
    if instance is None:
2472
      raise errors.OpPrereqError("Instance '%s' not known" %
2473
                                 self.op.instance_name)
2474

    
2475
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2476
      raise errors.OpPrereqError("Instance's disk layout is not"
2477
                                 " network mirrored, cannot failover.")
2478

    
2479
    secondary_nodes = instance.secondary_nodes
2480
    if not secondary_nodes:
2481
      raise errors.ProgrammerError("no secondary node but using "
2482
                                   "DT_REMOTE_RAID1 template")
2483

    
2484
    # check memory requirements on the secondary node
2485
    target_node = secondary_nodes[0]
2486
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2487
    info = nodeinfo.get(target_node, None)
2488
    if not info:
2489
      raise errors.OpPrereqError("Cannot get current information"
2490
                                 " from node '%s'" % nodeinfo)
2491
    if instance.memory > info['memory_free']:
2492
      raise errors.OpPrereqError("Not enough memory on target node %s."
2493
                                 " %d MB available, %d MB required" %
2494
                                 (target_node, info['memory_free'],
2495
                                  instance.memory))
2496

    
2497
    # check bridge existance
2498
    brlist = [nic.bridge for nic in instance.nics]
2499
    if not rpc.call_bridges_exist(target_node, brlist):
2500
      raise errors.OpPrereqError("One or more target bridges %s does not"
2501
                                 " exist on destination node '%s'" %
2502
                                 (brlist, target_node))
2503

    
2504
    self.instance = instance
2505

    
2506
  def Exec(self, feedback_fn):
2507
    """Failover an instance.
2508

2509
    The failover is done by shutting it down on its present node and
2510
    starting it on the secondary.
2511

2512
    """
2513
    instance = self.instance
2514

    
2515
    source_node = instance.primary_node
2516
    target_node = instance.secondary_nodes[0]
2517

    
2518
    feedback_fn("* checking disk consistency between source and target")
2519
    for dev in instance.disks:
2520
      # for remote_raid1, these are md over drbd
2521
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2522
        if not self.op.ignore_consistency:
2523
          raise errors.OpExecError("Disk %s is degraded on target node,"
2524
                                   " aborting failover." % dev.iv_name)
2525

    
2526
    feedback_fn("* checking target node resource availability")
2527
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2528

    
2529
    if not nodeinfo:
2530
      raise errors.OpExecError("Could not contact target node %s." %
2531
                               target_node)
2532

    
2533
    free_memory = int(nodeinfo[target_node]['memory_free'])
2534
    memory = instance.memory
2535
    if memory > free_memory:
2536
      raise errors.OpExecError("Not enough memory to create instance %s on"
2537
                               " node %s. needed %s MiB, available %s MiB" %
2538
                               (instance.name, target_node, memory,
2539
                                free_memory))
2540

    
2541
    feedback_fn("* shutting down instance on source node")
2542
    logger.Info("Shutting down instance %s on node %s" %
2543
                (instance.name, source_node))
2544

    
2545
    if not rpc.call_instance_shutdown(source_node, instance):
2546
      if self.op.ignore_consistency:
2547
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2548
                     " anyway. Please make sure node %s is down"  %
2549
                     (instance.name, source_node, source_node))
2550
      else:
2551
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2552
                                 (instance.name, source_node))
2553

    
2554
    feedback_fn("* deactivating the instance's disks on source node")
2555
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2556
      raise errors.OpExecError("Can't shut down the instance's disks.")
2557

    
2558
    instance.primary_node = target_node
2559
    # distribute new instance config to the other nodes
2560
    self.cfg.AddInstance(instance)
2561

    
2562
    feedback_fn("* activating the instance's disks on target node")
2563
    logger.Info("Starting instance %s on node %s" %
2564
                (instance.name, target_node))
2565

    
2566
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2567
                                             ignore_secondaries=True)
2568
    if not disks_ok:
2569
      _ShutdownInstanceDisks(instance, self.cfg)
2570
      raise errors.OpExecError("Can't activate the instance's disks")
2571

    
2572
    feedback_fn("* starting the instance on the target node")
2573
    if not rpc.call_instance_start(target_node, instance, None):
2574
      _ShutdownInstanceDisks(instance, self.cfg)
2575
      raise errors.OpExecError("Could not start instance %s on node %s." %
2576
                               (instance.name, target_node))
2577

    
2578

    
2579
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2580
  """Create a tree of block devices on the primary node.
2581

2582
  This always creates all devices.
2583

2584
  """
2585
  if device.children:
2586
    for child in device.children:
2587
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2588
        return False
2589

    
2590
  cfg.SetDiskID(device, node)
2591
  new_id = rpc.call_blockdev_create(node, device, device.size,
2592
                                    instance.name, True, info)
2593
  if not new_id:
2594
    return False
2595
  if device.physical_id is None:
2596
    device.physical_id = new_id
2597
  return True
2598

    
2599

    
2600
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2601
  """Create a tree of block devices on a secondary node.
2602

2603
  If this device type has to be created on secondaries, create it and
2604
  all its children.
2605

2606
  If not, just recurse to children keeping the same 'force' value.
2607

2608
  """
2609
  if device.CreateOnSecondary():
2610
    force = True
2611
  if device.children:
2612
    for child in device.children:
2613
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2614
                                        child, force, info):
2615
        return False
2616

    
2617
  if not force:
2618
    return True
2619
  cfg.SetDiskID(device, node)
2620
  new_id = rpc.call_blockdev_create(node, device, device.size,
2621
                                    instance.name, False, info)
2622
  if not new_id:
2623
    return False
2624
  if device.physical_id is None:
2625
    device.physical_id = new_id
2626
  return True
2627

    
2628

    
2629
def _GenerateUniqueNames(cfg, exts):
2630
  """Generate a suitable LV name.
2631

2632
  This will generate a logical volume name for the given instance.
2633

2634
  """
2635
  results = []
2636
  for val in exts:
2637
    new_id = cfg.GenerateUniqueID()
2638
    results.append("%s%s" % (new_id, val))
2639
  return results
2640

    
2641

    
2642
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2643
  """Generate a drbd device complete with its children.
2644

2645
  """
2646
  port = cfg.AllocatePort()
2647
  vgname = cfg.GetVGName()
2648
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2649
                          logical_id=(vgname, names[0]))
2650
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2651
                          logical_id=(vgname, names[1]))
2652
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2653
                          logical_id = (primary, secondary, port),
2654
                          children = [dev_data, dev_meta])
2655
  return drbd_dev
2656

    
2657

    
2658
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2659
  """Generate a drbd8 device complete with its children.
2660

2661
  """
2662
  port = cfg.AllocatePort()
2663
  vgname = cfg.GetVGName()
2664
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2665
                          logical_id=(vgname, names[0]))
2666
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2667
                          logical_id=(vgname, names[1]))
2668
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2669
                          logical_id = (primary, secondary, port),
2670
                          children = [dev_data, dev_meta],
2671
                          iv_name=iv_name)
2672
  return drbd_dev
2673

    
2674
def _GenerateDiskTemplate(cfg, template_name,
2675
                          instance_name, primary_node,
2676
                          secondary_nodes, disk_sz, swap_sz):
2677
  """Generate the entire disk layout for a given template type.
2678

2679
  """
2680
  #TODO: compute space requirements
2681

    
2682
  vgname = cfg.GetVGName()
2683
  if template_name == "diskless":
2684
    disks = []
2685
  elif template_name == "plain":
2686
    if len(secondary_nodes) != 0:
2687
      raise errors.ProgrammerError("Wrong template configuration")
2688

    
2689
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2690
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2691
                           logical_id=(vgname, names[0]),
2692
                           iv_name = "sda")
2693
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2694
                           logical_id=(vgname, names[1]),
2695
                           iv_name = "sdb")
2696
    disks = [sda_dev, sdb_dev]
2697
  elif template_name == "local_raid1":
2698
    if len(secondary_nodes) != 0:
2699
      raise errors.ProgrammerError("Wrong template configuration")
2700

    
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2703
                                       ".sdb_m1", ".sdb_m2"])
2704
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2705
                              logical_id=(vgname, names[0]))
2706
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2707
                              logical_id=(vgname, names[1]))
2708
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2709
                              size=disk_sz,
2710
                              children = [sda_dev_m1, sda_dev_m2])
2711
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2712
                              logical_id=(vgname, names[2]))
2713
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714
                              logical_id=(vgname, names[3]))
2715
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2716
                              size=swap_sz,
2717
                              children = [sdb_dev_m1, sdb_dev_m2])
2718
    disks = [md_sda_dev, md_sdb_dev]
2719
  elif template_name == constants.DT_REMOTE_RAID1:
2720
    if len(secondary_nodes) != 1:
2721
      raise errors.ProgrammerError("Wrong template configuration")
2722
    remote_node = secondary_nodes[0]
2723
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724
                                       ".sdb_data", ".sdb_meta"])
2725
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2726
                                         disk_sz, names[0:2])
2727
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2728
                              children = [drbd_sda_dev], size=disk_sz)
2729
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2730
                                         swap_sz, names[2:4])
2731
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2732
                              children = [drbd_sdb_dev], size=swap_sz)
2733
    disks = [md_sda_dev, md_sdb_dev]
2734
  elif template_name == constants.DT_DRBD8:
2735
    if len(secondary_nodes) != 1:
2736
      raise errors.ProgrammerError("Wrong template configuration")
2737
    remote_node = secondary_nodes[0]
2738
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2739
                                       ".sdb_data", ".sdb_meta"])
2740
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741
                                         disk_sz, names[0:2], "sda")
2742
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2743
                                         swap_sz, names[2:4], "sdb")
2744
    disks = [drbd_sda_dev, drbd_sdb_dev]
2745
  else:
2746
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2747
  return disks
2748

    
2749

    
2750
def _GetInstanceInfoText(instance):
2751
  """Compute that text that should be added to the disk's metadata.
2752

2753
  """
2754
  return "originstname+%s" % instance.name
2755

    
2756

    
2757
def _CreateDisks(cfg, instance):
2758
  """Create all disks for an instance.
2759

2760
  This abstracts away some work from AddInstance.
2761

2762
  Args:
2763
    instance: the instance object
2764

2765
  Returns:
2766
    True or False showing the success of the creation process
2767

2768
  """
2769
  info = _GetInstanceInfoText(instance)
2770

    
2771
  for device in instance.disks:
2772
    logger.Info("creating volume %s for instance %s" %
2773
              (device.iv_name, instance.name))
2774
    #HARDCODE
2775
    for secondary_node in instance.secondary_nodes:
2776
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777
                                        device, False, info):
2778
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779
                     (device.iv_name, device, secondary_node))
2780
        return False
2781
    #HARDCODE
2782
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783
                                    instance, device, info):
2784
      logger.Error("failed to create volume %s on primary!" %
2785
                   device.iv_name)
2786
      return False
2787
  return True
2788

    
2789

    
2790
def _RemoveDisks(instance, cfg):
2791
  """Remove all disks for an instance.
2792

2793
  This abstracts away some work from `AddInstance()` and
2794
  `RemoveInstance()`. Note that in case some of the devices couldn't
2795
  be removed, the removal will continue with the other ones (compare
2796
  with `_CreateDisks()`).
2797

2798
  Args:
2799
    instance: the instance object
2800

2801
  Returns:
2802
    True or False showing the success of the removal proces
2803

2804
  """
2805
  logger.Info("removing block devices for instance %s" % instance.name)
2806

    
2807
  result = True
2808
  for device in instance.disks:
2809
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2810
      cfg.SetDiskID(disk, node)
2811
      if not rpc.call_blockdev_remove(node, disk):
2812
        logger.Error("could not remove block device %s on node %s,"
2813
                     " continuing anyway" %
2814
                     (device.iv_name, node))
2815
        result = False
2816
  return result
2817

    
2818

    
2819
class LUCreateInstance(LogicalUnit):
2820
  """Create an instance.
2821

2822
  """
2823
  HPATH = "instance-add"
2824
  HTYPE = constants.HTYPE_INSTANCE
2825
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2826
              "disk_template", "swap_size", "mode", "start", "vcpus",
2827
              "wait_for_sync", "ip_check"]
2828

    
2829
  def BuildHooksEnv(self):
2830
    """Build hooks env.
2831

2832
    This runs on master, primary and secondary nodes of the instance.
2833

2834
    """
2835
    env = {
2836
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2837
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2838
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2839
      "INSTANCE_ADD_MODE": self.op.mode,
2840
      }
2841
    if self.op.mode == constants.INSTANCE_IMPORT:
2842
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2843
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2844
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2845

    
2846
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2847
      primary_node=self.op.pnode,
2848
      secondary_nodes=self.secondaries,
2849
      status=self.instance_status,
2850
      os_type=self.op.os_type,
2851
      memory=self.op.mem_size,
2852
      vcpus=self.op.vcpus,
2853
      nics=[(self.inst_ip, self.op.bridge)],
2854
    ))
2855

    
2856
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2857
          self.secondaries)
2858
    return env, nl, nl
2859

    
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    """
2865
    if self.op.mode not in (constants.INSTANCE_CREATE,
2866
                            constants.INSTANCE_IMPORT):
2867
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2868
                                 self.op.mode)
2869

    
2870
    if self.op.mode == constants.INSTANCE_IMPORT:
2871
      src_node = getattr(self.op, "src_node", None)
2872
      src_path = getattr(self.op, "src_path", None)
2873
      if src_node is None or src_path is None:
2874
        raise errors.OpPrereqError("Importing an instance requires source"
2875
                                   " node and path options")
2876
      src_node_full = self.cfg.ExpandNodeName(src_node)
2877
      if src_node_full is None:
2878
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2879
      self.op.src_node = src_node = src_node_full
2880

    
2881
      if not os.path.isabs(src_path):
2882
        raise errors.OpPrereqError("The source path must be absolute")
2883

    
2884
      export_info = rpc.call_export_info(src_node, src_path)
2885

    
2886
      if not export_info:
2887
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2888

    
2889
      if not export_info.has_section(constants.INISECT_EXP):
2890
        raise errors.ProgrammerError("Corrupted export config")
2891

    
2892
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2893
      if (int(ei_version) != constants.EXPORT_VERSION):
2894
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2895
                                   (ei_version, constants.EXPORT_VERSION))
2896

    
2897
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2898
        raise errors.OpPrereqError("Can't import instance with more than"
2899
                                   " one data disk")
2900

    
2901
      # FIXME: are the old os-es, disk sizes, etc. useful?
2902
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2903
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2904
                                                         'disk0_dump'))
2905
      self.src_image = diskimage
2906
    else: # INSTANCE_CREATE
2907
      if getattr(self.op, "os_type", None) is None:
2908
        raise errors.OpPrereqError("No guest OS specified")
2909

    
2910
    # check primary node
2911
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2912
    if pnode is None:
2913
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2914
                                 self.op.pnode)
2915
    self.op.pnode = pnode.name
2916
    self.pnode = pnode
2917
    self.secondaries = []
2918
    # disk template and mirror node verification
2919
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2920
      raise errors.OpPrereqError("Invalid disk template name")
2921

    
2922
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2923
      if getattr(self.op, "snode", None) is None:
2924
        raise errors.OpPrereqError("The networked disk templates need"
2925
                                   " a mirror node")
2926

    
2927
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2928
      if snode_name is None:
2929
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2930
                                   self.op.snode)
2931
      elif snode_name == pnode.name:
2932
        raise errors.OpPrereqError("The secondary node cannot be"
2933
                                   " the primary node.")
2934
      self.secondaries.append(snode_name)
2935

    
2936
    # Check lv size requirements
2937
    nodenames = [pnode.name] + self.secondaries
2938
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2939

    
2940
    # Required free disk space as a function of disk and swap space
2941
    req_size_dict = {
2942
      constants.DT_DISKLESS: 0,
2943
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2944
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2945
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2946
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2947
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2948
    }
2949

    
2950
    if self.op.disk_template not in req_size_dict:
2951
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2952
                                   " is unknown" %  self.op.disk_template)
2953

    
2954
    req_size = req_size_dict[self.op.disk_template]
2955

    
2956
    for node in nodenames:
2957
      info = nodeinfo.get(node, None)
2958
      if not info:
2959
        raise errors.OpPrereqError("Cannot get current information"
2960
                                   " from node '%s'" % nodeinfo)
2961
      if req_size > info['vg_free']:
2962
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2963
                                   " %d MB available, %d MB required" %
2964
                                   (node, info['vg_free'], req_size))
2965

    
2966
    # os verification
2967
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2968
    if not os_obj:
2969
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2970
                                 " primary node"  % self.op.os_type)
2971

    
2972
    # instance verification
2973
    hostname1 = utils.HostInfo(self.op.instance_name)
2974

    
2975
    self.op.instance_name = instance_name = hostname1.name
2976
    instance_list = self.cfg.GetInstanceList()
2977
    if instance_name in instance_list:
2978
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2979
                                 instance_name)
2980

    
2981
    ip = getattr(self.op, "ip", None)
2982
    if ip is None or ip.lower() == "none":
2983
      inst_ip = None
2984
    elif ip.lower() == "auto":
2985
      inst_ip = hostname1.ip
2986
    else:
2987
      if not utils.IsValidIP(ip):
2988
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2989
                                   " like a valid IP" % ip)
2990
      inst_ip = ip
2991
    self.inst_ip = inst_ip
2992

    
2993
    if self.op.start and not self.op.ip_check:
2994
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2995
                                 " adding an instance in start mode")
2996

    
2997
    if self.op.ip_check:
2998
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2999
                       constants.DEFAULT_NODED_PORT):
3000
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3001
                                   (hostname1.ip, instance_name))
3002

    
3003
    # bridge verification
3004
    bridge = getattr(self.op, "bridge", None)
3005
    if bridge is None:
3006
      self.op.bridge = self.cfg.GetDefBridge()
3007
    else:
3008
      self.op.bridge = bridge
3009

    
3010
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3011
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3012
                                 " destination node '%s'" %
3013
                                 (self.op.bridge, pnode.name))
3014

    
3015
    if self.op.start:
3016
      self.instance_status = 'up'
3017
    else:
3018
      self.instance_status = 'down'
3019

    
3020
  def Exec(self, feedback_fn):
3021
    """Create and add the instance to the cluster.
3022

3023
    """
3024
    instance = self.op.instance_name
3025
    pnode_name = self.pnode.name
3026

    
3027
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3028
    if self.inst_ip is not None:
3029
      nic.ip = self.inst_ip
3030

    
3031
    network_port = None  # placeholder assignment for later
3032

    
3033
    disks = _GenerateDiskTemplate(self.cfg,
3034
                                  self.op.disk_template,
3035
                                  instance, pnode_name,
3036
                                  self.secondaries, self.op.disk_size,
3037
                                  self.op.swap_size)
3038

    
3039
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3040
                            primary_node=pnode_name,
3041
                            memory=self.op.mem_size,
3042
                            vcpus=self.op.vcpus,
3043
                            nics=[nic], disks=disks,
3044
                            disk_template=self.op.disk_template,
3045
                            status=self.instance_status,
3046
                            network_port=network_port,
3047
                            )
3048

    
3049
    feedback_fn("* creating instance disks...")
3050
    if not _CreateDisks(self.cfg, iobj):
3051
      _RemoveDisks(iobj, self.cfg)
3052
      raise errors.OpExecError("Device creation failed, reverting...")
3053

    
3054
    feedback_fn("adding instance %s to cluster config" % instance)
3055

    
3056
    self.cfg.AddInstance(iobj)
3057

    
3058
    if self.op.wait_for_sync:
3059
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3060
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3061
      # make sure the disks are not degraded (still sync-ing is ok)
3062
      time.sleep(15)
3063
      feedback_fn("* checking mirrors status")
3064
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3065
    else:
3066
      disk_abort = False
3067

    
3068
    if disk_abort:
3069
      _RemoveDisks(iobj, self.cfg)
3070
      self.cfg.RemoveInstance(iobj.name)
3071
      raise errors.OpExecError("There are some degraded disks for"
3072
                               " this instance")
3073

    
3074
    feedback_fn("creating os for instance %s on node %s" %
3075
                (instance, pnode_name))
3076

    
3077
    if iobj.disk_template != constants.DT_DISKLESS:
3078
      if self.op.mode == constants.INSTANCE_CREATE:
3079
        feedback_fn("* running the instance OS create scripts...")
3080
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3081
          raise errors.OpExecError("could not add os for instance %s"
3082
                                   " on node %s" %
3083
                                   (instance, pnode_name))
3084

    
3085
      elif self.op.mode == constants.INSTANCE_IMPORT:
3086
        feedback_fn("* running the instance OS import scripts...")
3087
        src_node = self.op.src_node
3088
        src_image = self.src_image
3089
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3090
                                                src_node, src_image):
3091
          raise errors.OpExecError("Could not import os for instance"
3092
                                   " %s on node %s" %
3093
                                   (instance, pnode_name))
3094
      else:
3095
        # also checked in the prereq part
3096
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3097
                                     % self.op.mode)
3098

    
3099
    if self.op.start:
3100
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3101
      feedback_fn("* starting instance...")
3102
      if not rpc.call_instance_start(pnode_name, iobj, None):
3103
        raise errors.OpExecError("Could not start instance")
3104

    
3105

    
3106
class LUConnectConsole(NoHooksLU):
3107
  """Connect to an instance's console.
3108

3109
  This is somewhat special in that it returns the command line that
3110
  you need to run on the master node in order to connect to the
3111
  console.
3112

3113
  """
3114
  _OP_REQP = ["instance_name"]
3115

    
3116
  def CheckPrereq(self):
3117
    """Check prerequisites.
3118

3119
    This checks that the instance is in the cluster.
3120

3121
    """
3122
    instance = self.cfg.GetInstanceInfo(
3123
      self.cfg.ExpandInstanceName(self.op.instance_name))
3124
    if instance is None:
3125
      raise errors.OpPrereqError("Instance '%s' not known" %
3126
                                 self.op.instance_name)
3127
    self.instance = instance
3128

    
3129
  def Exec(self, feedback_fn):
3130
    """Connect to the console of an instance
3131

3132
    """
3133
    instance = self.instance
3134
    node = instance.primary_node
3135

    
3136
    node_insts = rpc.call_instance_list([node])[node]
3137
    if node_insts is False:
3138
      raise errors.OpExecError("Can't connect to node %s." % node)
3139

    
3140
    if instance.name not in node_insts:
3141
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3142

    
3143
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3144

    
3145
    hyper = hypervisor.GetHypervisor()
3146
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3147
    # build ssh cmdline
3148
    argv = ["ssh", "-q", "-t"]
3149
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3150
    argv.extend(ssh.BATCH_MODE_OPTS)
3151
    argv.append(node)
3152
    argv.append(console_cmd)
3153
    return "ssh", argv
3154

    
3155

    
3156
class LUAddMDDRBDComponent(LogicalUnit):
3157
  """Adda new mirror member to an instance's disk.
3158

3159
  """
3160
  HPATH = "mirror-add"
3161
  HTYPE = constants.HTYPE_INSTANCE
3162
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3163

    
3164
  def BuildHooksEnv(self):
3165
    """Build hooks env.
3166

3167
    This runs on the master, the primary and all the secondaries.
3168

3169
    """
3170
    env = {
3171
      "NEW_SECONDARY": self.op.remote_node,
3172
      "DISK_NAME": self.op.disk_name,
3173
      }
3174
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3175
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3176
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3177
    return env, nl, nl
3178

    
3179
  def CheckPrereq(self):
3180
    """Check prerequisites.
3181

3182
    This checks that the instance is in the cluster.
3183

3184
    """
3185
    instance = self.cfg.GetInstanceInfo(
3186
      self.cfg.ExpandInstanceName(self.op.instance_name))
3187
    if instance is None:
3188
      raise errors.OpPrereqError("Instance '%s' not known" %
3189
                                 self.op.instance_name)
3190
    self.instance = instance
3191

    
3192
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3193
    if remote_node is None:
3194
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3195
    self.remote_node = remote_node
3196

    
3197
    if remote_node == instance.primary_node:
3198
      raise errors.OpPrereqError("The specified node is the primary node of"
3199
                                 " the instance.")
3200

    
3201
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3202
      raise errors.OpPrereqError("Instance's disk layout is not"
3203
                                 " remote_raid1.")
3204
    for disk in instance.disks:
3205
      if disk.iv_name == self.op.disk_name:
3206
        break
3207
    else:
3208
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3209
                                 " instance." % self.op.disk_name)
3210
    if len(disk.children) > 1:
3211
      raise errors.OpPrereqError("The device already has two slave devices."
3212
                                 " This would create a 3-disk raid1 which we"
3213
                                 " don't allow.")
3214
    self.disk = disk
3215

    
3216
  def Exec(self, feedback_fn):
3217
    """Add the mirror component
3218

3219
    """
3220
    disk = self.disk
3221
    instance = self.instance
3222

    
3223
    remote_node = self.remote_node
3224
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3225
    names = _GenerateUniqueNames(self.cfg, lv_names)
3226
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3227
                                     remote_node, disk.size, names)
3228

    
3229
    logger.Info("adding new mirror component on secondary")
3230
    #HARDCODE
3231
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3232
                                      new_drbd, False,
3233
                                      _GetInstanceInfoText(instance)):
3234
      raise errors.OpExecError("Failed to create new component on secondary"
3235
                               " node %s" % remote_node)
3236

    
3237
    logger.Info("adding new mirror component on primary")
3238
    #HARDCODE
3239
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3240
                                    instance, new_drbd,
3241
                                    _GetInstanceInfoText(instance)):
3242
      # remove secondary dev
3243
      self.cfg.SetDiskID(new_drbd, remote_node)
3244
      rpc.call_blockdev_remove(remote_node, new_drbd)
3245
      raise errors.OpExecError("Failed to create volume on primary")
3246

    
3247
    # the device exists now
3248
    # call the primary node to add the mirror to md
3249
    logger.Info("adding new mirror component to md")
3250
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3251
                                         disk, [new_drbd]):
3252
      logger.Error("Can't add mirror compoment to md!")
3253
      self.cfg.SetDiskID(new_drbd, remote_node)
3254
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3255
        logger.Error("Can't rollback on secondary")
3256
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3257
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3258
        logger.Error("Can't rollback on primary")
3259
      raise errors.OpExecError("Can't add mirror component to md array")
3260

    
3261
    disk.children.append(new_drbd)
3262

    
3263
    self.cfg.AddInstance(instance)
3264

    
3265
    _WaitForSync(self.cfg, instance, self.proc)
3266

    
3267
    return 0
3268

    
3269

    
3270
class LURemoveMDDRBDComponent(LogicalUnit):
3271
  """Remove a component from a remote_raid1 disk.
3272

3273
  """
3274
  HPATH = "mirror-remove"
3275
  HTYPE = constants.HTYPE_INSTANCE
3276
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3277

    
3278
  def BuildHooksEnv(self):
3279
    """Build hooks env.
3280

3281
    This runs on the master, the primary and all the secondaries.
3282

3283
    """
3284
    env = {
3285
      "DISK_NAME": self.op.disk_name,
3286
      "DISK_ID": self.op.disk_id,
3287
      "OLD_SECONDARY": self.old_secondary,
3288
      }
3289
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3290
    nl = [self.sstore.GetMasterNode(),
3291
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3292
    return env, nl, nl
3293

    
3294
  def CheckPrereq(self):
3295
    """Check prerequisites.
3296

3297
    This checks that the instance is in the cluster.
3298

3299
    """
3300
    instance = self.cfg.GetInstanceInfo(
3301
      self.cfg.ExpandInstanceName(self.op.instance_name))
3302
    if instance is None:
3303
      raise errors.OpPrereqError("Instance '%s' not known" %
3304
                                 self.op.instance_name)
3305
    self.instance = instance
3306

    
3307
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3308
      raise errors.OpPrereqError("Instance's disk layout is not"
3309
                                 " remote_raid1.")
3310
    for disk in instance.disks:
3311
      if disk.iv_name == self.op.disk_name:
3312
        break
3313
    else:
3314
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3315
                                 " instance." % self.op.disk_name)
3316
    for child in disk.children:
3317
      if (child.dev_type == constants.LD_DRBD7 and
3318
          child.logical_id[2] == self.op.disk_id):
3319
        break
3320
    else:
3321
      raise errors.OpPrereqError("Can't find the device with this port.")
3322

    
3323
    if len(disk.children) < 2:
3324
      raise errors.OpPrereqError("Cannot remove the last component from"
3325
                                 " a mirror.")
3326
    self.disk = disk
3327
    self.child = child
3328
    if self.child.logical_id[0] == instance.primary_node:
3329
      oid = 1
3330
    else:
3331
      oid = 0
3332
    self.old_secondary = self.child.logical_id[oid]
3333

    
3334
  def Exec(self, feedback_fn):
3335
    """Remove the mirror component
3336

3337
    """
3338
    instance = self.instance
3339
    disk = self.disk
3340
    child = self.child
3341
    logger.Info("remove mirror component")
3342
    self.cfg.SetDiskID(disk, instance.primary_node)
3343
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3344
                                            disk, [child]):
3345
      raise errors.OpExecError("Can't remove child from mirror.")
3346

    
3347
    for node in child.logical_id[:2]:
3348
      self.cfg.SetDiskID(child, node)
3349
      if not rpc.call_blockdev_remove(node, child):
3350
        logger.Error("Warning: failed to remove device from node %s,"
3351
                     " continuing operation." % node)
3352

    
3353
    disk.children.remove(child)
3354
    self.cfg.AddInstance(instance)
3355

    
3356

    
3357
class LUReplaceDisks(LogicalUnit):
3358
  """Replace the disks of an instance.
3359

3360
  """
3361
  HPATH = "mirrors-replace"
3362
  HTYPE = constants.HTYPE_INSTANCE
3363
  _OP_REQP = ["instance_name", "mode", "disks"]
3364

    
3365
  def BuildHooksEnv(self):
3366
    """Build hooks env.
3367

3368
    This runs on the master, the primary and all the secondaries.
3369

3370
    """
3371
    env = {
3372
      "MODE": self.op.mode,
3373
      "NEW_SECONDARY": self.op.remote_node,
3374
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3375
      }
3376
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3377
    nl = [
3378
      self.sstore.GetMasterNode(),
3379
      self.instance.primary_node,
3380
      ]
3381
    if self.op.remote_node is not None:
3382
      nl.append(self.op.remote_node)
3383
    return env, nl, nl
3384

    
3385
  def CheckPrereq(self):
3386
    """Check prerequisites.
3387

3388
    This checks that the instance is in the cluster.
3389

3390
    """
3391
    instance = self.cfg.GetInstanceInfo(
3392
      self.cfg.ExpandInstanceName(self.op.instance_name))
3393
    if instance is None:
3394
      raise errors.OpPrereqError("Instance '%s' not known" %
3395
                                 self.op.instance_name)
3396
    self.instance = instance
3397
    self.op.instance_name = instance.name
3398

    
3399
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3400
      raise errors.OpPrereqError("Instance's disk layout is not"
3401
                                 " network mirrored.")
3402

    
3403
    if len(instance.secondary_nodes) != 1:
3404
      raise errors.OpPrereqError("The instance has a strange layout,"
3405
                                 " expected one secondary but found %d" %
3406
                                 len(instance.secondary_nodes))
3407

    
3408
    self.sec_node = instance.secondary_nodes[0]
3409

    
3410
    remote_node = getattr(self.op, "remote_node", None)
3411
    if remote_node is not None:
3412
      remote_node = self.cfg.ExpandNodeName(remote_node)
3413
      if remote_node is None:
3414
        raise errors.OpPrereqError("Node '%s' not known" %
3415
                                   self.op.remote_node)
3416
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3417
    else:
3418
      self.remote_node_info = None
3419
    if remote_node == instance.primary_node:
3420
      raise errors.OpPrereqError("The specified node is the primary node of"
3421
                                 " the instance.")
3422
    elif remote_node == self.sec_node:
3423
      if self.op.mode == constants.REPLACE_DISK_SEC:
3424
        # this is for DRBD8, where we can't execute the same mode of
3425
        # replacement as for drbd7 (no different port allocated)
3426
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3427
                                   " replacement")
3428
      # the user gave the current secondary, switch to
3429
      # 'no-replace-secondary' mode for drbd7
3430
      remote_node = None
3431
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3432
        self.op.mode != constants.REPLACE_DISK_ALL):
3433
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3434
                                 " disks replacement, not individual ones")
3435
    if instance.disk_template == constants.DT_DRBD8:
3436
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3437
          remote_node is not None):
3438
        # switch to replace secondary mode
3439
        self.op.mode = constants.REPLACE_DISK_SEC
3440

    
3441
      if self.op.mode == constants.REPLACE_DISK_ALL:
3442
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3443
                                   " secondary disk replacement, not"
3444
                                   " both at once")
3445
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3446
        if remote_node is not None:
3447
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3448
                                     " the secondary while doing a primary"
3449
                                     " node disk replacement")
3450
        self.tgt_node = instance.primary_node
3451
        self.oth_node = instance.secondary_nodes[0]
3452
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3453
        self.new_node = remote_node # this can be None, in which case
3454
                                    # we don't change the secondary
3455
        self.tgt_node = instance.secondary_nodes[0]
3456
        self.oth_node = instance.primary_node
3457
      else:
3458
        raise errors.ProgrammerError("Unhandled disk replace mode")
3459

    
3460
    for name in self.op.disks:
3461
      if instance.FindDisk(name) is None:
3462
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3463
                                   (name, instance.name))
3464
    self.op.remote_node = remote_node
3465

    
3466
  def _ExecRR1(self, feedback_fn):
3467
    """Replace the disks of an instance.
3468

3469
    """
3470
    instance = self.instance
3471
    iv_names = {}
3472
    # start of work
3473
    if self.op.remote_node is None:
3474
      remote_node = self.sec_node
3475
    else:
3476
      remote_node = self.op.remote_node
3477
    cfg = self.cfg
3478
    for dev in instance.disks:
3479
      size = dev.size
3480
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3481
      names = _GenerateUniqueNames(cfg, lv_names)
3482
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3483
                                       remote_node, size, names)
3484
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3485
      logger.Info("adding new mirror component on secondary for %s" %
3486
                  dev.iv_name)
3487
      #HARDCODE
3488
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3489
                                        new_drbd, False,
3490
                                        _GetInstanceInfoText(instance)):
3491
        raise errors.OpExecError("Failed to create new component on secondary"
3492
                                 " node %s. Full abort, cleanup manually!" %
3493
                                 remote_node)
3494

    
3495
      logger.Info("adding new mirror component on primary")
3496
      #HARDCODE
3497
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3498
                                      instance, new_drbd,
3499
                                      _GetInstanceInfoText(instance)):
3500
        # remove secondary dev
3501
        cfg.SetDiskID(new_drbd, remote_node)
3502
        rpc.call_blockdev_remove(remote_node, new_drbd)
3503
        raise errors.OpExecError("Failed to create volume on primary!"
3504
                                 " Full abort, cleanup manually!!")
3505

    
3506
      # the device exists now
3507
      # call the primary node to add the mirror to md
3508
      logger.Info("adding new mirror component to md")
3509
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3510
                                           [new_drbd]):
3511
        logger.Error("Can't add mirror compoment to md!")
3512
        cfg.SetDiskID(new_drbd, remote_node)
3513
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3514
          logger.Error("Can't rollback on secondary")
3515
        cfg.SetDiskID(new_drbd, instance.primary_node)
3516
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3517
          logger.Error("Can't rollback on primary")
3518
        raise errors.OpExecError("Full abort, cleanup manually!!")
3519

    
3520
      dev.children.append(new_drbd)
3521
      cfg.AddInstance(instance)
3522

    
3523
    # this can fail as the old devices are degraded and _WaitForSync
3524
    # does a combined result over all disks, so we don't check its
3525
    # return value
3526
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3527

    
3528
    # so check manually all the devices
3529
    for name in iv_names:
3530
      dev, child, new_drbd = iv_names[name]
3531
      cfg.SetDiskID(dev, instance.primary_node)
3532
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3533
      if is_degr:
3534
        raise errors.OpExecError("MD device %s is degraded!" % name)
3535
      cfg.SetDiskID(new_drbd, instance.primary_node)
3536
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3537
      if is_degr:
3538
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3539

    
3540
    for name in iv_names:
3541
      dev, child, new_drbd = iv_names[name]
3542
      logger.Info("remove mirror %s component" % name)
3543
      cfg.SetDiskID(dev, instance.primary_node)
3544
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3545
                                              dev, [child]):
3546
        logger.Error("Can't remove child from mirror, aborting"
3547
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3548
        continue
3549

    
3550
      for node in child.logical_id[:2]:
3551
        logger.Info("remove child device on %s" % node)
3552
        cfg.SetDiskID(child, node)
3553
        if not rpc.call_blockdev_remove(node, child):
3554
          logger.Error("Warning: failed to remove device from node %s,"
3555
                       " continuing operation." % node)
3556

    
3557
      dev.children.remove(child)
3558

    
3559
      cfg.AddInstance(instance)
3560

    
3561
  def _ExecD8DiskOnly(self, feedback_fn):
3562
    """Replace a disk on the primary or secondary for dbrd8.
3563

3564
    The algorithm for replace is quite complicated:
3565
      - for each disk to be replaced:
3566
        - create new LVs on the target node with unique names
3567
        - detach old LVs from the drbd device
3568
        - rename old LVs to name_replaced.<time_t>
3569
        - rename new LVs to old LVs
3570
        - attach the new LVs (with the old names now) to the drbd device
3571
      - wait for sync across all devices
3572
      - for each modified disk:
3573
        - remove old LVs (which have the name name_replaces.<time_t>)
3574

3575
    Failures are not very well handled.
3576

3577
    """
3578
    steps_total = 6
3579
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3580
    instance = self.instance
3581
    iv_names = {}
3582
    vgname = self.cfg.GetVGName()
3583
    # start of work
3584
    cfg = self.cfg
3585
    tgt_node = self.tgt_node
3586
    oth_node = self.oth_node
3587

    
3588
    # Step: check device activation
3589
    self.proc.LogStep(1, steps_total, "check device existence")
3590
    info("checking volume groups")
3591
    my_vg = cfg.GetVGName()
3592
    results = rpc.call_vg_list([oth_node, tgt_node])
3593
    if not results:
3594
      raise errors.OpExecError("Can't list volume groups on the nodes")
3595
    for node in oth_node, tgt_node:
3596
      res = results.get(node, False)
3597
      if not res or my_vg not in res:
3598
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3599
                                 (my_vg, node))
3600
    for dev in instance.disks:
3601
      if not dev.iv_name in self.op.disks:
3602
        continue
3603
      for node in tgt_node, oth_node:
3604
        info("checking %s on %s" % (dev.iv_name, node))
3605
        cfg.SetDiskID(dev, node)
3606
        if not rpc.call_blockdev_find(node, dev):
3607
          raise errors.OpExecError("Can't find device %s on node %s" %
3608
                                   (dev.iv_name, node))
3609

    
3610
    # Step: check other node consistency
3611
    self.proc.LogStep(2, steps_total, "check peer consistency")
3612
    for dev in instance.disks:
3613
      if not dev.iv_name in self.op.disks:
3614
        continue
3615
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3616
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3617
                                   oth_node==instance.primary_node):
3618
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3619
                                 " to replace disks on this node (%s)" %
3620
                                 (oth_node, tgt_node))
3621

    
3622
    # Step: create new storage
3623
    self.proc.LogStep(3, steps_total, "allocate new storage")
3624
    for dev in instance.disks:
3625
      if not dev.iv_name in self.op.disks:
3626
        continue
3627
      size = dev.size
3628
      cfg.SetDiskID(dev, tgt_node)
3629
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3630
      names = _GenerateUniqueNames(cfg, lv_names)
3631
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3632
                             logical_id=(vgname, names[0]))
3633
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3634
                             logical_id=(vgname, names[1]))
3635
      new_lvs = [lv_data, lv_meta]
3636
      old_lvs = dev.children
3637
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3638
      info("creating new local storage on %s for %s" %
3639
           (tgt_node, dev.iv_name))
3640
      # since we *always* want to create this LV, we use the
3641
      # _Create...OnPrimary (which forces the creation), even if we
3642
      # are talking about the secondary node
3643
      for new_lv in new_lvs:
3644
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3645
                                        _GetInstanceInfoText(instance)):
3646
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3647
                                   " node '%s'" %
3648
                                   (new_lv.logical_id[1], tgt_node))
3649

    
3650
    # Step: for each lv, detach+rename*2+attach
3651
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3652
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3653
      info("detaching %s drbd from local storage" % dev.iv_name)
3654
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3655
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3656
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3657
      #dev.children = []
3658
      #cfg.Update(instance)
3659

    
3660
      # ok, we created the new LVs, so now we know we have the needed
3661
      # storage; as such, we proceed on the target node to rename
3662
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3663
      # using the assumption than logical_id == physical_id (which in
3664
      # turn is the unique_id on that node)
3665

    
3666
      # FIXME(iustin): use a better name for the replaced LVs
3667
      temp_suffix = int(time.time())
3668
      ren_fn = lambda d, suff: (d.physical_id[0],
3669
                                d.physical_id[1] + "_replaced-%s" % suff)
3670
      # build the rename list based on what LVs exist on the node
3671
      rlist = []
3672
      for to_ren in old_lvs:
3673
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3674
        if find_res is not None: # device exists
3675
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3676

    
3677
      info("renaming the old LVs on the target node")
3678
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3679
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3680
      # now we rename the new LVs to the old LVs
3681
      info("renaming the new LVs on the target node")
3682
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3683
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3684
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3685

    
3686
      for old, new in zip(old_lvs, new_lvs):
3687
        new.logical_id = old.logical_id
3688
        cfg.SetDiskID(new, tgt_node)
3689

    
3690
      for disk in old_lvs:
3691
        disk.logical_id = ren_fn(disk, temp_suffix)
3692
        cfg.SetDiskID(disk, tgt_node)
3693

    
3694
      # now that the new lvs have the old name, we can add them to the device
3695
      info("adding new mirror component on %s" % tgt_node)
3696
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3697
        for new_lv in new_lvs:
3698
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3699
            warning("Can't rollback device %s", hint="manually cleanup unused"
3700
                    " logical volumes")
3701
        raise errors.OpExecError("Can't add local storage to drbd")
3702

    
3703
      dev.children = new_lvs
3704
      cfg.Update(instance)
3705

    
3706
    # Step: wait for sync
3707

    
3708
    # this can fail as the old devices are degraded and _WaitForSync
3709
    # does a combined result over all disks, so we don't check its
3710
    # return value
3711
    self.proc.LogStep(5, steps_total, "sync devices")
3712
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3713

    
3714
    # so check manually all the devices
3715
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3716
      cfg.SetDiskID(dev, instance.primary_node)
3717
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3718
      if is_degr:
3719
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3720

    
3721
    # Step: remove old storage
3722
    self.proc.LogStep(6, steps_total, "removing old storage")
3723
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724
      info("remove logical volumes for %s" % name)
3725
      for lv in old_lvs:
3726
        cfg.SetDiskID(lv, tgt_node)
3727
        if not rpc.call_blockdev_remove(tgt_node, lv):
3728
          warning("Can't remove old LV", hint="manually remove unused LVs")
3729
          continue
3730

    
3731
  def _ExecD8Secondary(self, feedback_fn):
3732
    """Replace the secondary node for drbd8.
3733

3734
    The algorithm for replace is quite complicated:
3735
      - for all disks of the instance:
3736
        - create new LVs on the new node with same names
3737
        - shutdown the drbd device on the old secondary
3738
        - disconnect the drbd network on the primary
3739
        - create the drbd device on the new secondary
3740
        - network attach the drbd on the primary, using an artifice:
3741
          the drbd code for Attach() will connect to the network if it
3742
          finds a device which is connected to the good local disks but
3743
          not network enabled
3744
      - wait for sync across all devices
3745
      - remove all disks from the old secondary
3746

3747
    Failures are not very well handled.
3748

3749
    """
3750
    steps_total = 6
3751
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3752
    instance = self.instance
3753
    iv_names = {}
3754
    vgname = self.cfg.GetVGName()
3755
    # start of work
3756
    cfg = self.cfg
3757
    old_node = self.tgt_node
3758
    new_node = self.new_node
3759
    pri_node = instance.primary_node
3760

    
3761
    # Step: check device activation
3762
    self.proc.LogStep(1, steps_total, "check device existence")
3763
    info("checking volume groups")
3764
    my_vg = cfg.GetVGName()
3765
    results = rpc.call_vg_list([pri_node, new_node])
3766
    if not results:
3767
      raise errors.OpExecError("Can't list volume groups on the nodes")
3768
    for node in pri_node, new_node:
3769
      res = results.get(node, False)
3770
      if not res or my_vg not in res:
3771
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3772
                                 (my_vg, node))
3773
    for dev in instance.disks:
3774
      if not dev.iv_name in self.op.disks:
3775
        continue
3776
      info("checking %s on %s" % (dev.iv_name, pri_node))
3777
      cfg.SetDiskID(dev, pri_node)
3778
      if not rpc.call_blockdev_find(pri_node, dev):
3779
        raise errors.OpExecError("Can't find device %s on node %s" %
3780
                                 (dev.iv_name, pri_node))
3781

    
3782
    # Step: check other node consistency
3783
    self.proc.LogStep(2, steps_total, "check peer consistency")
3784
    for dev in instance.disks:
3785
      if not dev.iv_name in self.op.disks:
3786
        continue
3787
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3788
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3789
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3790
                                 " unsafe to replace the secondary" %
3791
                                 pri_node)
3792

    
3793
    # Step: create new storage
3794
    self.proc.LogStep(3, steps_total, "allocate new storage")
3795
    for dev in instance.disks:
3796
      size = dev.size
3797
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3798
      # since we *always* want to create this LV, we use the
3799
      # _Create...OnPrimary (which forces the creation), even if we
3800
      # are talking about the secondary node
3801
      for new_lv in dev.children:
3802
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3803
                                        _GetInstanceInfoText(instance)):
3804
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3805
                                   " node '%s'" %
3806
                                   (new_lv.logical_id[1], new_node))
3807

    
3808
      iv_names[dev.iv_name] = (dev, dev.children)
3809

    
3810
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3811
    for dev in instance.disks:
3812
      size = dev.size
3813
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3814
      # create new devices on new_node
3815
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3816
                              logical_id=(pri_node, new_node,
3817
                                          dev.logical_id[2]),
3818
                              children=dev.children)
3819
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3820
                                        new_drbd, False,
3821
                                      _GetInstanceInfoText(instance)):
3822
        raise errors.OpExecError("Failed to create new DRBD on"
3823
                                 " node '%s'" % new_node)
3824

    
3825
    for dev in instance.disks:
3826
      # we have new devices, shutdown the drbd on the old secondary
3827
      info("shutting down drbd for %s on old node" % dev.iv_name)
3828
      cfg.SetDiskID(dev, old_node)
3829
      if not rpc.call_blockdev_shutdown(old_node, dev):
3830
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3831
                hint="Please cleanup this device manually as soon as possible")
3832

    
3833
    info("detaching primary drbds from the network (=> standalone)")
3834
    done = 0
3835
    for dev in instance.disks:
3836
      cfg.SetDiskID(dev, pri_node)
3837
      # set the physical (unique in bdev terms) id to None, meaning
3838
      # detach from network
3839
      dev.physical_id = (None,) * len(dev.physical_id)
3840
      # and 'find' the device, which will 'fix' it to match the
3841
      # standalone state
3842
      if rpc.call_blockdev_find(pri_node, dev):
3843
        done += 1
3844
      else:
3845
        warning("Failed to detach drbd %s from network, unusual case" %
3846
                dev.iv_name)
3847

    
3848
    if not done:
3849
      # no detaches succeeded (very unlikely)
3850
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3851

    
3852
    # if we managed to detach at least one, we update all the disks of
3853
    # the instance to point to the new secondary
3854
    info("updating instance configuration")
3855
    for dev in instance.disks:
3856
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3857
      cfg.SetDiskID(dev, pri_node)
3858
    cfg.Update(instance)
3859

    
3860
    # and now perform the drbd attach
3861
    info("attaching primary drbds to new secondary (standalone => connected)")
3862
    failures = []
3863
    for dev in instance.disks:
3864
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3865
      # since the attach is smart, it's enough to 'find' the device,
3866
      # it will automatically activate the network, if the physical_id
3867
      # is correct
3868
      cfg.SetDiskID(dev, pri_node)
3869
      if not rpc.call_blockdev_find(pri_node, dev):
3870
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3871
                "please do a gnt-instance info to see the status of disks")
3872

    
3873
    # this can fail as the old devices are degraded and _WaitForSync
3874
    # does a combined result over all disks, so we don't check its
3875
    # return value
3876
    self.proc.LogStep(5, steps_total, "sync devices")
3877
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3878

    
3879
    # so check manually all the devices
3880
    for name, (dev, old_lvs) in iv_names.iteritems():
3881
      cfg.SetDiskID(dev, pri_node)
3882
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3883
      if is_degr:
3884
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3885

    
3886
    self.proc.LogStep(6, steps_total, "removing old storage")
3887
    for name, (dev, old_lvs) in iv_names.iteritems():
3888
      info("remove logical volumes for %s" % name)
3889
      for lv in old_lvs:
3890
        cfg.SetDiskID(lv, old_node)
3891
        if not rpc.call_blockdev_remove(old_node, lv):
3892
          warning("Can't remove LV on old secondary",
3893
                  hint="Cleanup stale volumes by hand")
3894

    
3895
  def Exec(self, feedback_fn):
3896
    """Execute disk replacement.
3897

3898
    This dispatches the disk replacement to the appropriate handler.
3899

3900
    """
3901
    instance = self.instance
3902
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3903
      fn = self._ExecRR1
3904
    elif instance.disk_template == constants.DT_DRBD8:
3905
      if self.op.remote_node is None:
3906
        fn = self._ExecD8DiskOnly
3907
      else:
3908
        fn = self._ExecD8Secondary
3909
    else:
3910
      raise errors.ProgrammerError("Unhandled disk replacement case")
3911
    return fn(feedback_fn)
3912

    
3913

    
3914
class LUQueryInstanceData(NoHooksLU):
3915
  """Query runtime instance data.
3916

3917
  """
3918
  _OP_REQP = ["instances"]
3919

    
3920
  def CheckPrereq(self):
3921
    """Check prerequisites.
3922

3923
    This only checks the optional instance list against the existing names.
3924

3925
    """
3926
    if not isinstance(self.op.instances, list):
3927
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3928
    if self.op.instances:
3929
      self.wanted_instances = []
3930
      names = self.op.instances
3931
      for name in names:
3932
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3933
        if instance is None:
3934
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3935
      self.wanted_instances.append(instance)
3936
    else:
3937
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3938
                               in self.cfg.GetInstanceList()]
3939
    return
3940

    
3941

    
3942
  def _ComputeDiskStatus(self, instance, snode, dev):
3943
    """Compute block device status.
3944

3945
    """
3946
    self.cfg.SetDiskID(dev, instance.primary_node)
3947
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3948
    if dev.dev_type in constants.LDS_DRBD:
3949
      # we change the snode then (otherwise we use the one passed in)
3950
      if dev.logical_id[0] == instance.primary_node:
3951
        snode = dev.logical_id[1]
3952
      else:
3953
        snode = dev.logical_id[0]
3954

    
3955
    if snode:
3956
      self.cfg.SetDiskID(dev, snode)
3957
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3958
    else:
3959
      dev_sstatus = None
3960

    
3961
    if dev.children:
3962
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3963
                      for child in dev.children]
3964
    else:
3965
      dev_children = []
3966

    
3967
    data = {
3968
      "iv_name": dev.iv_name,
3969
      "dev_type": dev.dev_type,
3970
      "logical_id": dev.logical_id,
3971
      "physical_id": dev.physical_id,
3972
      "pstatus": dev_pstatus,
3973
      "sstatus": dev_sstatus,
3974
      "children": dev_children,
3975
      }
3976

    
3977
    return data
3978

    
3979
  def Exec(self, feedback_fn):
3980
    """Gather and return data"""
3981
    result = {}
3982
    for instance in self.wanted_instances:
3983
      remote_info = rpc.call_instance_info(instance.primary_node,
3984
                                                instance.name)
3985
      if remote_info and "state" in remote_info:
3986
        remote_state = "up"
3987
      else:
3988
        remote_state = "down"
3989
      if instance.status == "down":
3990
        config_state = "down"
3991
      else:
3992
        config_state = "up"
3993

    
3994
      disks = [self._ComputeDiskStatus(instance, None, device)
3995
               for device in instance.disks]
3996

    
3997
      idict = {
3998
        "name": instance.name,
3999
        "config_state": config_state,
4000
        "run_state": remote_state,
4001
        "pnode": instance.primary_node,
4002
        "snodes": instance.secondary_nodes,
4003
        "os": instance.os,
4004
        "memory": instance.memory,
4005
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4006
        "disks": disks,
4007
        "network_port": instance.network_port,
4008
        "vcpus": instance.vcpus,
4009
        }
4010

    
4011
      result[instance.name] = idict
4012

    
4013
    return result
4014

    
4015

    
4016
class LUSetInstanceParms(LogicalUnit):
4017
  """Modifies an instances's parameters.
4018

4019
  """
4020
  HPATH = "instance-modify"
4021
  HTYPE = constants.HTYPE_INSTANCE
4022
  _OP_REQP = ["instance_name"]
4023

    
4024
  def BuildHooksEnv(self):
4025
    """Build hooks env.
4026

4027
    This runs on the master, primary and secondaries.
4028

4029
    """
4030
    args = dict()
4031
    if self.mem:
4032
      args['memory'] = self.mem
4033
    if self.vcpus:
4034
      args['vcpus'] = self.vcpus
4035
    if self.do_ip or self.do_bridge:
4036
      if self.do_ip:
4037
        ip = self.ip
4038
      else:
4039
        ip = self.instance.nics[0].ip
4040
      if self.bridge:
4041
        bridge = self.bridge
4042
      else:
4043
        bridge = self.instance.nics[0].bridge
4044
      args['nics'] = [(ip, bridge)]
4045
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4046
    nl = [self.sstore.GetMasterNode(),
4047
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4048
    return env, nl, nl
4049

    
4050
  def CheckPrereq(self):
4051
    """Check prerequisites.
4052

4053
    This only checks the instance list against the existing names.
4054

4055
    """
4056
    self.mem = getattr(self.op, "mem", None)
4057
    self.vcpus = getattr(self.op, "vcpus", None)
4058
    self.ip = getattr(self.op, "ip", None)
4059
    self.bridge = getattr(self.op, "bridge", None)
4060
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4061
      raise errors.OpPrereqError("No changes submitted")
4062
    if self.mem is not None:
4063
      try:
4064
        self.mem = int(self.mem)
4065
      except ValueError, err:
4066
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4067
    if self.vcpus is not None:
4068
      try:
4069
        self.vcpus = int(self.vcpus)
4070
      except ValueError, err:
4071
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4072
    if self.ip is not None:
4073
      self.do_ip = True
4074
      if self.ip.lower() == "none":
4075
        self.ip = None
4076
      else:
4077
        if not utils.IsValidIP(self.ip):
4078
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4079
    else:
4080
      self.do_ip = False
4081
    self.do_bridge = (self.bridge is not None)
4082

    
4083
    instance = self.cfg.GetInstanceInfo(
4084
      self.cfg.ExpandInstanceName(self.op.instance_name))
4085
    if instance is None:
4086
      raise errors.OpPrereqError("No such instance name '%s'" %
4087
                                 self.op.instance_name)
4088
    self.op.instance_name = instance.name
4089
    self.instance = instance
4090
    return
4091

    
4092
  def Exec(self, feedback_fn):
4093
    """Modifies an instance.
4094

4095
    All parameters take effect only at the next restart of the instance.
4096
    """
4097
    result = []
4098
    instance = self.instance
4099
    if self.mem:
4100
      instance.memory = self.mem
4101
      result.append(("mem", self.mem))
4102
    if self.vcpus:
4103
      instance.vcpus = self.vcpus
4104
      result.append(("vcpus",  self.vcpus))
4105
    if self.do_ip:
4106
      instance.nics[0].ip = self.ip
4107
      result.append(("ip", self.ip))
4108
    if self.bridge:
4109
      instance.nics[0].bridge = self.bridge
4110
      result.append(("bridge", self.bridge))
4111

    
4112
    self.cfg.AddInstance(instance)
4113

    
4114
    return result
4115

    
4116

    
4117
class LUQueryExports(NoHooksLU):
4118
  """Query the exports list
4119

4120
  """
4121
  _OP_REQP = []
4122

    
4123
  def CheckPrereq(self):
4124
    """Check that the nodelist contains only existing nodes.
4125

4126
    """
4127
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4128

    
4129
  def Exec(self, feedback_fn):
4130
    """Compute the list of all the exported system images.
4131

4132
    Returns:
4133
      a dictionary with the structure node->(export-list)
4134
      where export-list is a list of the instances exported on
4135
      that node.
4136

4137
    """
4138
    return rpc.call_export_list(self.nodes)
4139

    
4140

    
4141
class LUExportInstance(LogicalUnit):
4142
  """Export an instance to an image in the cluster.
4143

4144
  """
4145
  HPATH = "instance-export"
4146
  HTYPE = constants.HTYPE_INSTANCE
4147
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4148

    
4149
  def BuildHooksEnv(self):
4150
    """Build hooks env.
4151

4152
    This will run on the master, primary node and target node.
4153

4154
    """
4155
    env = {
4156
      "EXPORT_NODE": self.op.target_node,
4157
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4158
      }
4159
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4160
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4161
          self.op.target_node]
4162
    return env, nl, nl
4163

    
4164
  def CheckPrereq(self):
4165
    """Check prerequisites.
4166

4167
    This checks that the instance name is a valid one.
4168

4169
    """
4170
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4171
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4172
    if self.instance is None:
4173
      raise errors.OpPrereqError("Instance '%s' not found" %
4174
                                 self.op.instance_name)
4175

    
4176
    # node verification
4177
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4178
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4179

    
4180
    if self.dst_node is None:
4181
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4182
                                 self.op.target_node)
4183
    self.op.target_node = self.dst_node.name
4184

    
4185
  def Exec(self, feedback_fn):
4186
    """Export an instance to an image in the cluster.
4187

4188
    """
4189
    instance = self.instance
4190
    dst_node = self.dst_node
4191
    src_node = instance.primary_node
4192
    # shutdown the instance, unless requested not to do so
4193
    if self.op.shutdown:
4194
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4195
      self.proc.ChainOpCode(op)
4196

    
4197
    vgname = self.cfg.GetVGName()
4198

    
4199
    snap_disks = []
4200

    
4201
    try:
4202
      for disk in instance.disks:
4203
        if disk.iv_name == "sda":
4204
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4205
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4206

    
4207
          if not new_dev_name:
4208
            logger.Error("could not snapshot block device %s on node %s" %
4209
                         (disk.logical_id[1], src_node))
4210
          else:
4211
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4212
                                      logical_id=(vgname, new_dev_name),
4213
                                      physical_id=(vgname, new_dev_name),
4214
                                      iv_name=disk.iv_name)
4215
            snap_disks.append(new_dev)
4216

    
4217
    finally:
4218
      if self.op.shutdown:
4219
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4220
                                       force=False)
4221
        self.proc.ChainOpCode(op)
4222

    
4223
    # TODO: check for size
4224

    
4225
    for dev in snap_disks:
4226
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4227
                                           instance):
4228
        logger.Error("could not export block device %s from node"
4229
                     " %s to node %s" %
4230
                     (dev.logical_id[1], src_node, dst_node.name))
4231
      if not rpc.call_blockdev_remove(src_node, dev):
4232
        logger.Error("could not remove snapshot block device %s from"
4233
                     " node %s" % (dev.logical_id[1], src_node))
4234

    
4235
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4236
      logger.Error("could not finalize export for instance %s on node %s" %
4237
                   (instance.name, dst_node.name))
4238

    
4239
    nodelist = self.cfg.GetNodeList()
4240
    nodelist.remove(dst_node.name)
4241

    
4242
    # on one-node clusters nodelist will be empty after the removal
4243
    # if we proceed the backup would be removed because OpQueryExports
4244
    # substitutes an empty list with the full cluster node list.
4245
    if nodelist:
4246
      op = opcodes.OpQueryExports(nodes=nodelist)
4247
      exportlist = self.proc.ChainOpCode(op)
4248
      for node in exportlist:
4249
        if instance.name in exportlist[node]:
4250
          if not rpc.call_export_remove(node, instance.name):
4251
            logger.Error("could not remove older export for instance %s"
4252
                         " on node %s" % (instance.name, node))
4253

    
4254

    
4255
class TagsLU(NoHooksLU):
4256
  """Generic tags LU.
4257

4258
  This is an abstract class which is the parent of all the other tags LUs.
4259

4260
  """
4261
  def CheckPrereq(self):
4262
    """Check prerequisites.
4263

4264
    """
4265
    if self.op.kind == constants.TAG_CLUSTER:
4266
      self.target = self.cfg.GetClusterInfo()
4267
    elif self.op.kind == constants.TAG_NODE:
4268
      name = self.cfg.ExpandNodeName(self.op.name)
4269
      if name is None:
4270
        raise errors.OpPrereqError("Invalid node name (%s)" %
4271
                                   (self.op.name,))
4272
      self.op.name = name
4273
      self.target = self.cfg.GetNodeInfo(name)
4274
    elif self.op.kind == constants.TAG_INSTANCE:
4275
      name = self.cfg.ExpandInstanceName(self.op.name)
4276
      if name is None:
4277
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4278
                                   (self.op.name,))
4279
      self.op.name = name
4280
      self.target = self.cfg.GetInstanceInfo(name)
4281
    else:
4282
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4283
                                 str(self.op.kind))
4284

    
4285

    
4286
class LUGetTags(TagsLU):
4287
  """Returns the tags of a given object.
4288

4289
  """
4290
  _OP_REQP = ["kind", "name"]
4291

    
4292
  def Exec(self, feedback_fn):
4293
    """Returns the tag list.
4294

4295
    """
4296
    return self.target.GetTags()
4297

    
4298

    
4299
class LUSearchTags(NoHooksLU):
4300
  """Searches the tags for a given pattern.
4301

4302
  """
4303
  _OP_REQP = ["pattern"]
4304

    
4305
  def CheckPrereq(self):
4306
    """Check prerequisites.
4307

4308
    This checks the pattern passed for validity by compiling it.
4309

4310
    """
4311
    try:
4312
      self.re = re.compile(self.op.pattern)
4313
    except re.error, err:
4314
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4315
                                 (self.op.pattern, err))
4316

    
4317
  def Exec(self, feedback_fn):
4318
    """Returns the tag list.
4319

4320
    """
4321
    cfg = self.cfg
4322
    tgts = [("/cluster", cfg.GetClusterInfo())]
4323
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4324
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4325
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4326
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4327
    results = []
4328
    for path, target in tgts:
4329
      for tag in target.GetTags():
4330
        if self.re.search(tag):
4331
          results.append((path, tag))
4332
    return results
4333

    
4334

    
4335
class LUAddTags(TagsLU):
4336
  """Sets a tag on a given object.
4337

4338
  """
4339
  _OP_REQP = ["kind", "name", "tags"]
4340

    
4341
  def CheckPrereq(self):
4342
    """Check prerequisites.
4343

4344
    This checks the type and length of the tag name and value.
4345

4346
    """
4347
    TagsLU.CheckPrereq(self)
4348
    for tag in self.op.tags:
4349
      objects.TaggableObject.ValidateTag(tag)
4350

    
4351
  def Exec(self, feedback_fn):
4352
    """Sets the tag.
4353

4354
    """
4355
    try:
4356
      for tag in self.op.tags:
4357
        self.target.AddTag(tag)
4358
    except errors.TagError, err:
4359
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4360
    try:
4361
      self.cfg.Update(self.target)
4362
    except errors.ConfigurationError:
4363
      raise errors.OpRetryError("There has been a modification to the"
4364
                                " config file and the operation has been"
4365
                                " aborted. Please retry.")
4366

    
4367

    
4368
class LUDelTags(TagsLU):
4369
  """Delete a list of tags from a given object.
4370

4371
  """
4372
  _OP_REQP = ["kind", "name", "tags"]
4373

    
4374
  def CheckPrereq(self):
4375
    """Check prerequisites.
4376

4377
    This checks that we have the given tag.
4378

4379
    """
4380
    TagsLU.CheckPrereq(self)
4381
    for tag in self.op.tags:
4382
      objects.TaggableObject.ValidateTag(tag)
4383
    del_tags = frozenset(self.op.tags)
4384
    cur_tags = self.target.GetTags()
4385
    if not del_tags <= cur_tags:
4386
      diff_tags = del_tags - cur_tags
4387
      diff_names = ["'%s'" % tag for tag in diff_tags]
4388
      diff_names.sort()
4389
      raise errors.OpPrereqError("Tag(s) %s not found" %
4390
                                 (",".join(diff_names)))
4391

    
4392
  def Exec(self, feedback_fn):
4393
    """Remove the tag from the object.
4394

4395
    """
4396
    for tag in self.op.tags:
4397
      self.target.RemoveTag(tag)
4398
    try:
4399
      self.cfg.Update(self.target)
4400
    except errors.ConfigurationError:
4401
      raise errors.OpRetryError("There has been a modification to the"
4402
                                " config file and the operation has been"
4403
                                " aborted. Please retry.")