Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 2a6469d5

History | View | Annotate | Download (146.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
504
        raise errors.OpPrereqError("Please prepare the cluster VNC"
505
                                   "password file %s" %
506
                                   constants.VNC_PASSWORD_FILE)
507

    
508
    self.hostname = hostname = utils.HostInfo()
509

    
510
    if hostname.ip.startswith("127."):
511
      raise errors.OpPrereqError("This host's IP resolves to the private"
512
                                 " range (%s). Please fix DNS or /etc/hosts." %
513
                                 (hostname.ip,))
514

    
515
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516

    
517
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518
                         constants.DEFAULT_NODED_PORT):
519
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520
                                 " to %s,\nbut this ip address does not"
521
                                 " belong to this host."
522
                                 " Aborting." % hostname.ip)
523

    
524
    secondary_ip = getattr(self.op, "secondary_ip", None)
525
    if secondary_ip and not utils.IsValidIP(secondary_ip):
526
      raise errors.OpPrereqError("Invalid secondary ip given")
527
    if (secondary_ip and
528
        secondary_ip != hostname.ip and
529
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530
                           constants.DEFAULT_NODED_PORT))):
531
      raise errors.OpPrereqError("You gave %s as secondary IP,"
532
                                 " but it does not belong to this host." %
533
                                 secondary_ip)
534
    self.secondary_ip = secondary_ip
535

    
536
    # checks presence of the volume group given
537
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538

    
539
    if vgstatus:
540
      raise errors.OpPrereqError("Error: %s" % vgstatus)
541

    
542
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543
                    self.op.mac_prefix):
544
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545
                                 self.op.mac_prefix)
546

    
547
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
548
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549
                                 self.op.hypervisor_type)
550

    
551
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552
    if result.failed:
553
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554
                                 (self.op.master_netdev,
555
                                  result.output.strip()))
556

    
557
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
560
                                 " executable." % constants.NODE_INITD_SCRIPT)
561

    
562
  def Exec(self, feedback_fn):
563
    """Initialize the cluster.
564

565
    """
566
    clustername = self.clustername
567
    hostname = self.hostname
568

    
569
    # set up the simple store
570
    self.sstore = ss = ssconf.SimpleStore()
571
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576

    
577
    # set up the inter-node password and certificate
578
    _InitGanetiServerSetup(ss)
579

    
580
    # start the master ip
581
    rpc.call_node_start_master(hostname.name)
582

    
583
    # set up ssh config and /etc/hosts
584
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
585
    try:
586
      sshline = f.read()
587
    finally:
588
      f.close()
589
    sshkey = sshline.split(" ")[1]
590

    
591
    _AddHostToEtcHosts(hostname.name)
592

    
593
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594

    
595
    _InitSSHSetup(hostname.name)
596

    
597
    # init of cluster config file
598
    self.cfg = cfgw = config.ConfigWriter()
599
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600
                    sshkey, self.op.mac_prefix,
601
                    self.op.vg_name, self.op.def_bridge)
602

    
603

    
604
class LUDestroyCluster(NoHooksLU):
605
  """Logical unit for destroying the cluster.
606

607
  """
608
  _OP_REQP = []
609

    
610
  def CheckPrereq(self):
611
    """Check prerequisites.
612

613
    This checks whether the cluster is empty.
614

615
    Any errors are signalled by raising errors.OpPrereqError.
616

617
    """
618
    master = self.sstore.GetMasterNode()
619

    
620
    nodelist = self.cfg.GetNodeList()
621
    if len(nodelist) != 1 or nodelist[0] != master:
622
      raise errors.OpPrereqError("There are still %d node(s) in"
623
                                 " this cluster." % (len(nodelist) - 1))
624
    instancelist = self.cfg.GetInstanceList()
625
    if instancelist:
626
      raise errors.OpPrereqError("There are still %d instance(s) in"
627
                                 " this cluster." % len(instancelist))
628

    
629
  def Exec(self, feedback_fn):
630
    """Destroys the cluster.
631

632
    """
633
    master = self.sstore.GetMasterNode()
634
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635
    utils.CreateBackup(priv_key)
636
    utils.CreateBackup(pub_key)
637
    rpc.call_node_leave_cluster(master)
638

    
639

    
640
class LUVerifyCluster(NoHooksLU):
641
  """Verifies the cluster status.
642

643
  """
644
  _OP_REQP = []
645

    
646
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647
                  remote_version, feedback_fn):
648
    """Run multiple tests against a node.
649

650
    Test list:
651
      - compares ganeti version
652
      - checks vg existance and size > 20G
653
      - checks config file checksum
654
      - checks ssh to other nodes
655

656
    Args:
657
      node: name of the node to check
658
      file_list: required list of files
659
      local_cksum: dictionary of local files and their checksums
660

661
    """
662
    # compares ganeti version
663
    local_version = constants.PROTOCOL_VERSION
664
    if not remote_version:
665
      feedback_fn(" - ERROR: connection to %s failed" % (node))
666
      return True
667

    
668
    if local_version != remote_version:
669
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670
                      (local_version, node, remote_version))
671
      return True
672

    
673
    # checks vg existance and size > 20G
674

    
675
    bad = False
676
    if not vglist:
677
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678
                      (node,))
679
      bad = True
680
    else:
681
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682
      if vgstatus:
683
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684
        bad = True
685

    
686
    # checks config file checksum
687
    # checks ssh to any
688

    
689
    if 'filelist' not in node_result:
690
      bad = True
691
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
692
    else:
693
      remote_cksum = node_result['filelist']
694
      for file_name in file_list:
695
        if file_name not in remote_cksum:
696
          bad = True
697
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
698
        elif remote_cksum[file_name] != local_cksum[file_name]:
699
          bad = True
700
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701

    
702
    if 'nodelist' not in node_result:
703
      bad = True
704
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705
    else:
706
      if node_result['nodelist']:
707
        bad = True
708
        for node in node_result['nodelist']:
709
          feedback_fn("  - ERROR: communication with node '%s': %s" %
710
                          (node, node_result['nodelist'][node]))
711
    hyp_result = node_result.get('hypervisor', None)
712
    if hyp_result is not None:
713
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714
    return bad
715

    
716
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717
    """Verify an instance.
718

719
    This function checks to see if the required block devices are
720
    available on the instance's node.
721

722
    """
723
    bad = False
724

    
725
    instancelist = self.cfg.GetInstanceList()
726
    if not instance in instancelist:
727
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
728
                      (instance, instancelist))
729
      bad = True
730

    
731
    instanceconfig = self.cfg.GetInstanceInfo(instance)
732
    node_current = instanceconfig.primary_node
733

    
734
    node_vol_should = {}
735
    instanceconfig.MapLVsByNode(node_vol_should)
736

    
737
    for node in node_vol_should:
738
      for volume in node_vol_should[node]:
739
        if node not in node_vol_is or volume not in node_vol_is[node]:
740
          feedback_fn("  - ERROR: volume %s missing on node %s" %
741
                          (volume, node))
742
          bad = True
743

    
744
    if not instanceconfig.status == 'down':
745
      if not instance in node_instance[node_current]:
746
        feedback_fn("  - ERROR: instance %s not running on node %s" %
747
                        (instance, node_current))
748
        bad = True
749

    
750
    for node in node_instance:
751
      if (not node == node_current):
752
        if instance in node_instance[node]:
753
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
754
                          (instance, node))
755
          bad = True
756

    
757
    return bad
758

    
759
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760
    """Verify if there are any unknown volumes in the cluster.
761

762
    The .os, .swap and backup volumes are ignored. All other volumes are
763
    reported as unknown.
764

765
    """
766
    bad = False
767

    
768
    for node in node_vol_is:
769
      for volume in node_vol_is[node]:
770
        if node not in node_vol_should or volume not in node_vol_should[node]:
771
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772
                      (volume, node))
773
          bad = True
774
    return bad
775

    
776
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777
    """Verify the list of running instances.
778

779
    This checks what instances are running but unknown to the cluster.
780

781
    """
782
    bad = False
783
    for node in node_instance:
784
      for runninginstance in node_instance[node]:
785
        if runninginstance not in instancelist:
786
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787
                          (runninginstance, node))
788
          bad = True
789
    return bad
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This has no prerequisites.
795

796
    """
797
    pass
798

    
799
  def Exec(self, feedback_fn):
800
    """Verify integrity of cluster, performing various test on nodes.
801

802
    """
803
    bad = False
804
    feedback_fn("* Verifying global settings")
805
    for msg in self.cfg.VerifyConfig():
806
      feedback_fn("  - ERROR: %s" % msg)
807

    
808
    vg_name = self.cfg.GetVGName()
809
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
810
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811
    node_volume = {}
812
    node_instance = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832

    
833
    for node in nodelist:
834
      feedback_fn("* Verifying node %s" % node)
835
      result = self._VerifyNode(node, file_names, local_checksums,
836
                                all_vglist[node], all_nvinfo[node],
837
                                all_rversion[node], feedback_fn)
838
      bad = bad or result
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if type(volumeinfo) != dict:
844
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
845
        bad = True
846
        continue
847

    
848
      node_volume[node] = volumeinfo
849

    
850
      # node_instance
851
      nodeinstance = all_instanceinfo[node]
852
      if type(nodeinstance) != list:
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856

    
857
      node_instance[node] = nodeinstance
858

    
859
    node_vol_should = {}
860

    
861
    for instance in instancelist:
862
      feedback_fn("* Verifying instance %s" % instance)
863
      result =  self._VerifyInstance(instance, node_volume, node_instance,
864
                                     feedback_fn)
865
      bad = bad or result
866

    
867
      inst_config = self.cfg.GetInstanceInfo(instance)
868

    
869
      inst_config.MapLVsByNode(node_vol_should)
870

    
871
    feedback_fn("* Verifying orphan volumes")
872
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
873
                                       feedback_fn)
874
    bad = bad or result
875

    
876
    feedback_fn("* Verifying remaining instances")
877
    result = self._VerifyOrphanInstances(instancelist, node_instance,
878
                                         feedback_fn)
879
    bad = bad or result
880

    
881
    return int(bad)
882

    
883

    
884
class LUVerifyDisks(NoHooksLU):
885
  """Verifies the cluster disks status.
886

887
  """
888
  _OP_REQP = []
889

    
890
  def CheckPrereq(self):
891
    """Check prerequisites.
892

893
    This has no prerequisites.
894

895
    """
896
    pass
897

    
898
  def Exec(self, feedback_fn):
899
    """Verify integrity of cluster disks.
900

901
    """
902
    result = res_nodes, res_instances = [], []
903

    
904
    vg_name = self.cfg.GetVGName()
905
    nodes = utils.NiceSort(self.cfg.GetNodeList())
906
    instances = [self.cfg.GetInstanceInfo(name)
907
                 for name in self.cfg.GetInstanceList()]
908

    
909
    nv_dict = {}
910
    for inst in instances:
911
      inst_lvs = {}
912
      if (inst.status != "up" or
913
          inst.disk_template not in constants.DTS_NET_MIRROR):
914
        continue
915
      inst.MapLVsByNode(inst_lvs)
916
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
917
      for node, vol_list in inst_lvs.iteritems():
918
        for vol in vol_list:
919
          nv_dict[(node, vol)] = inst
920

    
921
    if not nv_dict:
922
      return result
923

    
924
    node_lvs = rpc.call_volume_list(nodes, vg_name)
925

    
926
    to_act = set()
927
    for node in nodes:
928
      # node_volume
929
      lvs = node_lvs[node]
930

    
931
      if not isinstance(lvs, dict):
932
        logger.Info("connection to node %s failed or invalid data returned" %
933
                    (node,))
934
        res_nodes.append(node)
935
        continue
936

    
937
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
938
        if not lv_online:
939
          inst = nv_dict.get((node, lv_name), None)
940
          if inst is not None and inst.name not in res_instances:
941
            res_instances.append(inst.name)
942

    
943
    return result
944

    
945

    
946
class LURenameCluster(LogicalUnit):
947
  """Rename the cluster.
948

949
  """
950
  HPATH = "cluster-rename"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = ["name"]
953

    
954
  def BuildHooksEnv(self):
955
    """Build hooks env.
956

957
    """
958
    env = {
959
      "OP_TARGET": self.op.sstore.GetClusterName(),
960
      "NEW_NAME": self.op.name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Verify that the passed name is a valid one.
967

968
    """
969
    hostname = utils.HostInfo(self.op.name)
970

    
971
    new_name = hostname.name
972
    self.ip = new_ip = hostname.ip
973
    old_name = self.sstore.GetClusterName()
974
    old_ip = self.sstore.GetMasterIP()
975
    if new_name == old_name and new_ip == old_ip:
976
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
977
                                 " cluster has changed")
978
    if new_ip != old_ip:
979
      result = utils.RunCmd(["fping", "-q", new_ip])
980
      if not result.failed:
981
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
982
                                   " reachable on the network. Aborting." %
983
                                   new_ip)
984

    
985
    self.op.name = new_name
986

    
987
  def Exec(self, feedback_fn):
988
    """Rename the cluster.
989

990
    """
991
    clustername = self.op.name
992
    ip = self.ip
993
    ss = self.sstore
994

    
995
    # shutdown the master IP
996
    master = ss.GetMasterNode()
997
    if not rpc.call_node_stop_master(master):
998
      raise errors.OpExecError("Could not disable the master role")
999

    
1000
    try:
1001
      # modify the sstore
1002
      ss.SetKey(ss.SS_MASTER_IP, ip)
1003
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1004

    
1005
      # Distribute updated ss config to all nodes
1006
      myself = self.cfg.GetNodeInfo(master)
1007
      dist_nodes = self.cfg.GetNodeList()
1008
      if myself.name in dist_nodes:
1009
        dist_nodes.remove(myself.name)
1010

    
1011
      logger.Debug("Copying updated ssconf data to all nodes")
1012
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1013
        fname = ss.KeyToFilename(keyname)
1014
        result = rpc.call_upload_file(dist_nodes, fname)
1015
        for to_node in dist_nodes:
1016
          if not result[to_node]:
1017
            logger.Error("copy of file %s to node %s failed" %
1018
                         (fname, to_node))
1019
    finally:
1020
      if not rpc.call_node_start_master(master):
1021
        logger.Error("Could not re-enable the master role on the master,"
1022
                     " please restart manually.")
1023

    
1024

    
1025
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1026
  """Sleep and poll for an instance's disk to sync.
1027

1028
  """
1029
  if not instance.disks:
1030
    return True
1031

    
1032
  if not oneshot:
1033
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1034

    
1035
  node = instance.primary_node
1036

    
1037
  for dev in instance.disks:
1038
    cfgw.SetDiskID(dev, node)
1039

    
1040
  retries = 0
1041
  while True:
1042
    max_time = 0
1043
    done = True
1044
    cumul_degraded = False
1045
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1046
    if not rstats:
1047
      proc.LogWarning("Can't get any data from node %s" % node)
1048
      retries += 1
1049
      if retries >= 10:
1050
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1051
                                 " aborting." % node)
1052
      time.sleep(6)
1053
      continue
1054
    retries = 0
1055
    for i in range(len(rstats)):
1056
      mstat = rstats[i]
1057
      if mstat is None:
1058
        proc.LogWarning("Can't compute data for node %s/%s" %
1059
                        (node, instance.disks[i].iv_name))
1060
        continue
1061
      # we ignore the ldisk parameter
1062
      perc_done, est_time, is_degraded, _ = mstat
1063
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1064
      if perc_done is not None:
1065
        done = False
1066
        if est_time is not None:
1067
          rem_time = "%d estimated seconds remaining" % est_time
1068
          max_time = est_time
1069
        else:
1070
          rem_time = "no time estimate"
1071
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1072
                     (instance.disks[i].iv_name, perc_done, rem_time))
1073
    if done or oneshot:
1074
      break
1075

    
1076
    if unlock:
1077
      utils.Unlock('cmd')
1078
    try:
1079
      time.sleep(min(60, max_time))
1080
    finally:
1081
      if unlock:
1082
        utils.Lock('cmd')
1083

    
1084
  if done:
1085
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1086
  return not cumul_degraded
1087

    
1088

    
1089
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1090
  """Check that mirrors are not degraded.
1091

1092
  The ldisk parameter, if True, will change the test from the
1093
  is_degraded attribute (which represents overall non-ok status for
1094
  the device(s)) to the ldisk (representing the local storage status).
1095

1096
  """
1097
  cfgw.SetDiskID(dev, node)
1098
  if ldisk:
1099
    idx = 6
1100
  else:
1101
    idx = 5
1102

    
1103
  result = True
1104
  if on_primary or dev.AssembleOnSecondary():
1105
    rstats = rpc.call_blockdev_find(node, dev)
1106
    if not rstats:
1107
      logger.ToStderr("Can't get any data from node %s" % node)
1108
      result = False
1109
    else:
1110
      result = result and (not rstats[idx])
1111
  if dev.children:
1112
    for child in dev.children:
1113
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1114

    
1115
  return result
1116

    
1117

    
1118
class LUDiagnoseOS(NoHooksLU):
1119
  """Logical unit for OS diagnose/query.
1120

1121
  """
1122
  _OP_REQP = []
1123

    
1124
  def CheckPrereq(self):
1125
    """Check prerequisites.
1126

1127
    This always succeeds, since this is a pure query LU.
1128

1129
    """
1130
    return
1131

    
1132
  def Exec(self, feedback_fn):
1133
    """Compute the list of OSes.
1134

1135
    """
1136
    node_list = self.cfg.GetNodeList()
1137
    node_data = rpc.call_os_diagnose(node_list)
1138
    if node_data == False:
1139
      raise errors.OpExecError("Can't gather the list of OSes")
1140
    return node_data
1141

    
1142

    
1143
class LURemoveNode(LogicalUnit):
1144
  """Logical unit for removing a node.
1145

1146
  """
1147
  HPATH = "node-remove"
1148
  HTYPE = constants.HTYPE_NODE
1149
  _OP_REQP = ["node_name"]
1150

    
1151
  def BuildHooksEnv(self):
1152
    """Build hooks env.
1153

1154
    This doesn't run on the target node in the pre phase as a failed
1155
    node would not allows itself to run.
1156

1157
    """
1158
    env = {
1159
      "OP_TARGET": self.op.node_name,
1160
      "NODE_NAME": self.op.node_name,
1161
      }
1162
    all_nodes = self.cfg.GetNodeList()
1163
    all_nodes.remove(self.op.node_name)
1164
    return env, all_nodes, all_nodes
1165

    
1166
  def CheckPrereq(self):
1167
    """Check prerequisites.
1168

1169
    This checks:
1170
     - the node exists in the configuration
1171
     - it does not have primary or secondary instances
1172
     - it's not the master
1173

1174
    Any errors are signalled by raising errors.OpPrereqError.
1175

1176
    """
1177
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1178
    if node is None:
1179
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1180

    
1181
    instance_list = self.cfg.GetInstanceList()
1182

    
1183
    masternode = self.sstore.GetMasterNode()
1184
    if node.name == masternode:
1185
      raise errors.OpPrereqError("Node is the master node,"
1186
                                 " you need to failover first.")
1187

    
1188
    for instance_name in instance_list:
1189
      instance = self.cfg.GetInstanceInfo(instance_name)
1190
      if node.name == instance.primary_node:
1191
        raise errors.OpPrereqError("Instance %s still running on the node,"
1192
                                   " please remove first." % instance_name)
1193
      if node.name in instance.secondary_nodes:
1194
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1195
                                   " please remove first." % instance_name)
1196
    self.op.node_name = node.name
1197
    self.node = node
1198

    
1199
  def Exec(self, feedback_fn):
1200
    """Removes the node from the cluster.
1201

1202
    """
1203
    node = self.node
1204
    logger.Info("stopping the node daemon and removing configs from node %s" %
1205
                node.name)
1206

    
1207
    rpc.call_node_leave_cluster(node.name)
1208

    
1209
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1210

    
1211
    logger.Info("Removing node %s from config" % node.name)
1212

    
1213
    self.cfg.RemoveNode(node.name)
1214

    
1215
    _RemoveHostFromEtcHosts(node.name)
1216

    
1217

    
1218
class LUQueryNodes(NoHooksLU):
1219
  """Logical unit for querying nodes.
1220

1221
  """
1222
  _OP_REQP = ["output_fields", "names"]
1223

    
1224
  def CheckPrereq(self):
1225
    """Check prerequisites.
1226

1227
    This checks that the fields required are valid output fields.
1228

1229
    """
1230
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1231
                                     "mtotal", "mnode", "mfree",
1232
                                     "bootid"])
1233

    
1234
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1235
                               "pinst_list", "sinst_list",
1236
                               "pip", "sip"],
1237
                       dynamic=self.dynamic_fields,
1238
                       selected=self.op.output_fields)
1239

    
1240
    self.wanted = _GetWantedNodes(self, self.op.names)
1241

    
1242
  def Exec(self, feedback_fn):
1243
    """Computes the list of nodes and their attributes.
1244

1245
    """
1246
    nodenames = self.wanted
1247
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1248

    
1249
    # begin data gathering
1250

    
1251
    if self.dynamic_fields.intersection(self.op.output_fields):
1252
      live_data = {}
1253
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1254
      for name in nodenames:
1255
        nodeinfo = node_data.get(name, None)
1256
        if nodeinfo:
1257
          live_data[name] = {
1258
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1259
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1260
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1261
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1262
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1263
            "bootid": nodeinfo['bootid'],
1264
            }
1265
        else:
1266
          live_data[name] = {}
1267
    else:
1268
      live_data = dict.fromkeys(nodenames, {})
1269

    
1270
    node_to_primary = dict([(name, set()) for name in nodenames])
1271
    node_to_secondary = dict([(name, set()) for name in nodenames])
1272

    
1273
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1274
                             "sinst_cnt", "sinst_list"))
1275
    if inst_fields & frozenset(self.op.output_fields):
1276
      instancelist = self.cfg.GetInstanceList()
1277

    
1278
      for instance_name in instancelist:
1279
        inst = self.cfg.GetInstanceInfo(instance_name)
1280
        if inst.primary_node in node_to_primary:
1281
          node_to_primary[inst.primary_node].add(inst.name)
1282
        for secnode in inst.secondary_nodes:
1283
          if secnode in node_to_secondary:
1284
            node_to_secondary[secnode].add(inst.name)
1285

    
1286
    # end data gathering
1287

    
1288
    output = []
1289
    for node in nodelist:
1290
      node_output = []
1291
      for field in self.op.output_fields:
1292
        if field == "name":
1293
          val = node.name
1294
        elif field == "pinst_list":
1295
          val = list(node_to_primary[node.name])
1296
        elif field == "sinst_list":
1297
          val = list(node_to_secondary[node.name])
1298
        elif field == "pinst_cnt":
1299
          val = len(node_to_primary[node.name])
1300
        elif field == "sinst_cnt":
1301
          val = len(node_to_secondary[node.name])
1302
        elif field == "pip":
1303
          val = node.primary_ip
1304
        elif field == "sip":
1305
          val = node.secondary_ip
1306
        elif field in self.dynamic_fields:
1307
          val = live_data[node.name].get(field, None)
1308
        else:
1309
          raise errors.ParameterError(field)
1310
        node_output.append(val)
1311
      output.append(node_output)
1312

    
1313
    return output
1314

    
1315

    
1316
class LUQueryNodeVolumes(NoHooksLU):
1317
  """Logical unit for getting volumes on node(s).
1318

1319
  """
1320
  _OP_REQP = ["nodes", "output_fields"]
1321

    
1322
  def CheckPrereq(self):
1323
    """Check prerequisites.
1324

1325
    This checks that the fields required are valid output fields.
1326

1327
    """
1328
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1329

    
1330
    _CheckOutputFields(static=["node"],
1331
                       dynamic=["phys", "vg", "name", "size", "instance"],
1332
                       selected=self.op.output_fields)
1333

    
1334

    
1335
  def Exec(self, feedback_fn):
1336
    """Computes the list of nodes and their attributes.
1337

1338
    """
1339
    nodenames = self.nodes
1340
    volumes = rpc.call_node_volumes(nodenames)
1341

    
1342
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1343
             in self.cfg.GetInstanceList()]
1344

    
1345
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1346

    
1347
    output = []
1348
    for node in nodenames:
1349
      if node not in volumes or not volumes[node]:
1350
        continue
1351

    
1352
      node_vols = volumes[node][:]
1353
      node_vols.sort(key=lambda vol: vol['dev'])
1354

    
1355
      for vol in node_vols:
1356
        node_output = []
1357
        for field in self.op.output_fields:
1358
          if field == "node":
1359
            val = node
1360
          elif field == "phys":
1361
            val = vol['dev']
1362
          elif field == "vg":
1363
            val = vol['vg']
1364
          elif field == "name":
1365
            val = vol['name']
1366
          elif field == "size":
1367
            val = int(float(vol['size']))
1368
          elif field == "instance":
1369
            for inst in ilist:
1370
              if node not in lv_by_node[inst]:
1371
                continue
1372
              if vol['name'] in lv_by_node[inst][node]:
1373
                val = inst.name
1374
                break
1375
            else:
1376
              val = '-'
1377
          else:
1378
            raise errors.ParameterError(field)
1379
          node_output.append(str(val))
1380

    
1381
        output.append(node_output)
1382

    
1383
    return output
1384

    
1385

    
1386
class LUAddNode(LogicalUnit):
1387
  """Logical unit for adding node to the cluster.
1388

1389
  """
1390
  HPATH = "node-add"
1391
  HTYPE = constants.HTYPE_NODE
1392
  _OP_REQP = ["node_name"]
1393

    
1394
  def BuildHooksEnv(self):
1395
    """Build hooks env.
1396

1397
    This will run on all nodes before, and on all nodes + the new node after.
1398

1399
    """
1400
    env = {
1401
      "OP_TARGET": self.op.node_name,
1402
      "NODE_NAME": self.op.node_name,
1403
      "NODE_PIP": self.op.primary_ip,
1404
      "NODE_SIP": self.op.secondary_ip,
1405
      }
1406
    nodes_0 = self.cfg.GetNodeList()
1407
    nodes_1 = nodes_0 + [self.op.node_name, ]
1408
    return env, nodes_0, nodes_1
1409

    
1410
  def CheckPrereq(self):
1411
    """Check prerequisites.
1412

1413
    This checks:
1414
     - the new node is not already in the config
1415
     - it is resolvable
1416
     - its parameters (single/dual homed) matches the cluster
1417

1418
    Any errors are signalled by raising errors.OpPrereqError.
1419

1420
    """
1421
    node_name = self.op.node_name
1422
    cfg = self.cfg
1423

    
1424
    dns_data = utils.HostInfo(node_name)
1425

    
1426
    node = dns_data.name
1427
    primary_ip = self.op.primary_ip = dns_data.ip
1428
    secondary_ip = getattr(self.op, "secondary_ip", None)
1429
    if secondary_ip is None:
1430
      secondary_ip = primary_ip
1431
    if not utils.IsValidIP(secondary_ip):
1432
      raise errors.OpPrereqError("Invalid secondary IP given")
1433
    self.op.secondary_ip = secondary_ip
1434
    node_list = cfg.GetNodeList()
1435
    if node in node_list:
1436
      raise errors.OpPrereqError("Node %s is already in the configuration"
1437
                                 % node)
1438

    
1439
    for existing_node_name in node_list:
1440
      existing_node = cfg.GetNodeInfo(existing_node_name)
1441
      if (existing_node.primary_ip == primary_ip or
1442
          existing_node.secondary_ip == primary_ip or
1443
          existing_node.primary_ip == secondary_ip or
1444
          existing_node.secondary_ip == secondary_ip):
1445
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1446
                                   " existing node %s" % existing_node.name)
1447

    
1448
    # check that the type of the node (single versus dual homed) is the
1449
    # same as for the master
1450
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1451
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1452
    newbie_singlehomed = secondary_ip == primary_ip
1453
    if master_singlehomed != newbie_singlehomed:
1454
      if master_singlehomed:
1455
        raise errors.OpPrereqError("The master has no private ip but the"
1456
                                   " new node has one")
1457
      else:
1458
        raise errors.OpPrereqError("The master has a private ip but the"
1459
                                   " new node doesn't have one")
1460

    
1461
    # checks reachablity
1462
    if not utils.TcpPing(utils.HostInfo().name,
1463
                         primary_ip,
1464
                         constants.DEFAULT_NODED_PORT):
1465
      raise errors.OpPrereqError("Node not reachable by ping")
1466

    
1467
    if not newbie_singlehomed:
1468
      # check reachability from my secondary ip to newbie's secondary ip
1469
      if not utils.TcpPing(myself.secondary_ip,
1470
                           secondary_ip,
1471
                           constants.DEFAULT_NODED_PORT):
1472
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1473
                                   " based ping to noded port")
1474

    
1475
    self.new_node = objects.Node(name=node,
1476
                                 primary_ip=primary_ip,
1477
                                 secondary_ip=secondary_ip)
1478

    
1479
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1480
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1481
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1482
                                   constants.VNC_PASSWORD_FILE)
1483

    
1484
  def Exec(self, feedback_fn):
1485
    """Adds the new node to the cluster.
1486

1487
    """
1488
    new_node = self.new_node
1489
    node = new_node.name
1490

    
1491
    # set up inter-node password and certificate and restarts the node daemon
1492
    gntpass = self.sstore.GetNodeDaemonPassword()
1493
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1494
      raise errors.OpExecError("ganeti password corruption detected")
1495
    f = open(constants.SSL_CERT_FILE)
1496
    try:
1497
      gntpem = f.read(8192)
1498
    finally:
1499
      f.close()
1500
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1501
    # so we use this to detect an invalid certificate; as long as the
1502
    # cert doesn't contain this, the here-document will be correctly
1503
    # parsed by the shell sequence below
1504
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1505
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1506
    if not gntpem.endswith("\n"):
1507
      raise errors.OpExecError("PEM must end with newline")
1508
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1509

    
1510
    # and then connect with ssh to set password and start ganeti-noded
1511
    # note that all the below variables are sanitized at this point,
1512
    # either by being constants or by the checks above
1513
    ss = self.sstore
1514
    mycommand = ("umask 077 && "
1515
                 "echo '%s' > '%s' && "
1516
                 "cat > '%s' << '!EOF.' && \n"
1517
                 "%s!EOF.\n%s restart" %
1518
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1519
                  constants.SSL_CERT_FILE, gntpem,
1520
                  constants.NODE_INITD_SCRIPT))
1521

    
1522
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1523
    if result.failed:
1524
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1525
                               " output: %s" %
1526
                               (node, result.fail_reason, result.output))
1527

    
1528
    # check connectivity
1529
    time.sleep(4)
1530

    
1531
    result = rpc.call_version([node])[node]
1532
    if result:
1533
      if constants.PROTOCOL_VERSION == result:
1534
        logger.Info("communication to node %s fine, sw version %s match" %
1535
                    (node, result))
1536
      else:
1537
        raise errors.OpExecError("Version mismatch master version %s,"
1538
                                 " node version %s" %
1539
                                 (constants.PROTOCOL_VERSION, result))
1540
    else:
1541
      raise errors.OpExecError("Cannot get version from the new node")
1542

    
1543
    # setup ssh on node
1544
    logger.Info("copy ssh key to node %s" % node)
1545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1546
    keyarray = []
1547
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1548
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1549
                priv_key, pub_key]
1550

    
1551
    for i in keyfiles:
1552
      f = open(i, 'r')
1553
      try:
1554
        keyarray.append(f.read())
1555
      finally:
1556
        f.close()
1557

    
1558
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1559
                               keyarray[3], keyarray[4], keyarray[5])
1560

    
1561
    if not result:
1562
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1563

    
1564
    # Add node to our /etc/hosts, and add key to known_hosts
1565
    _AddHostToEtcHosts(new_node.name)
1566

    
1567
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1568
                      self.cfg.GetHostKey())
1569

    
1570
    if new_node.secondary_ip != new_node.primary_ip:
1571
      if not rpc.call_node_tcp_ping(new_node.name,
1572
                                    constants.LOCALHOST_IP_ADDRESS,
1573
                                    new_node.secondary_ip,
1574
                                    constants.DEFAULT_NODED_PORT,
1575
                                    10, False):
1576
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1577
                                 " you gave (%s). Please fix and re-run this"
1578
                                 " command." % new_node.secondary_ip)
1579

    
1580
    success, msg = ssh.VerifyNodeHostname(node)
1581
    if not success:
1582
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1583
                               " than the one the resolver gives: %s."
1584
                               " Please fix and re-run this command." %
1585
                               (node, msg))
1586

    
1587
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1588
    # including the node just added
1589
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1590
    dist_nodes = self.cfg.GetNodeList() + [node]
1591
    if myself.name in dist_nodes:
1592
      dist_nodes.remove(myself.name)
1593

    
1594
    logger.Debug("Copying hosts and known_hosts to all nodes")
1595
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1596
      result = rpc.call_upload_file(dist_nodes, fname)
1597
      for to_node in dist_nodes:
1598
        if not result[to_node]:
1599
          logger.Error("copy of file %s to node %s failed" %
1600
                       (fname, to_node))
1601

    
1602
    to_copy = ss.GetFileList()
1603
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1604
      to_copy.append(constants.VNC_PASSWORD_FILE)
1605
    for fname in to_copy:
1606
      if not ssh.CopyFileToNode(node, fname):
1607
        logger.Error("could not copy file %s to node %s" % (fname, node))
1608

    
1609
    logger.Info("adding node %s to cluster.conf" % node)
1610
    self.cfg.AddNode(new_node)
1611

    
1612

    
1613
class LUMasterFailover(LogicalUnit):
1614
  """Failover the master node to the current node.
1615

1616
  This is a special LU in that it must run on a non-master node.
1617

1618
  """
1619
  HPATH = "master-failover"
1620
  HTYPE = constants.HTYPE_CLUSTER
1621
  REQ_MASTER = False
1622
  _OP_REQP = []
1623

    
1624
  def BuildHooksEnv(self):
1625
    """Build hooks env.
1626

1627
    This will run on the new master only in the pre phase, and on all
1628
    the nodes in the post phase.
1629

1630
    """
1631
    env = {
1632
      "OP_TARGET": self.new_master,
1633
      "NEW_MASTER": self.new_master,
1634
      "OLD_MASTER": self.old_master,
1635
      }
1636
    return env, [self.new_master], self.cfg.GetNodeList()
1637

    
1638
  def CheckPrereq(self):
1639
    """Check prerequisites.
1640

1641
    This checks that we are not already the master.
1642

1643
    """
1644
    self.new_master = utils.HostInfo().name
1645
    self.old_master = self.sstore.GetMasterNode()
1646

    
1647
    if self.old_master == self.new_master:
1648
      raise errors.OpPrereqError("This commands must be run on the node"
1649
                                 " where you want the new master to be."
1650
                                 " %s is already the master" %
1651
                                 self.old_master)
1652

    
1653
  def Exec(self, feedback_fn):
1654
    """Failover the master node.
1655

1656
    This command, when run on a non-master node, will cause the current
1657
    master to cease being master, and the non-master to become new
1658
    master.
1659

1660
    """
1661
    #TODO: do not rely on gethostname returning the FQDN
1662
    logger.Info("setting master to %s, old master: %s" %
1663
                (self.new_master, self.old_master))
1664

    
1665
    if not rpc.call_node_stop_master(self.old_master):
1666
      logger.Error("could disable the master role on the old master"
1667
                   " %s, please disable manually" % self.old_master)
1668

    
1669
    ss = self.sstore
1670
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1671
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1672
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1673
      logger.Error("could not distribute the new simple store master file"
1674
                   " to the other nodes, please check.")
1675

    
1676
    if not rpc.call_node_start_master(self.new_master):
1677
      logger.Error("could not start the master role on the new master"
1678
                   " %s, please check" % self.new_master)
1679
      feedback_fn("Error in activating the master IP on the new master,"
1680
                  " please fix manually.")
1681

    
1682

    
1683

    
1684
class LUQueryClusterInfo(NoHooksLU):
1685
  """Query cluster configuration.
1686

1687
  """
1688
  _OP_REQP = []
1689
  REQ_MASTER = False
1690

    
1691
  def CheckPrereq(self):
1692
    """No prerequsites needed for this LU.
1693

1694
    """
1695
    pass
1696

    
1697
  def Exec(self, feedback_fn):
1698
    """Return cluster config.
1699

1700
    """
1701
    result = {
1702
      "name": self.sstore.GetClusterName(),
1703
      "software_version": constants.RELEASE_VERSION,
1704
      "protocol_version": constants.PROTOCOL_VERSION,
1705
      "config_version": constants.CONFIG_VERSION,
1706
      "os_api_version": constants.OS_API_VERSION,
1707
      "export_version": constants.EXPORT_VERSION,
1708
      "master": self.sstore.GetMasterNode(),
1709
      "architecture": (platform.architecture()[0], platform.machine()),
1710
      }
1711

    
1712
    return result
1713

    
1714

    
1715
class LUClusterCopyFile(NoHooksLU):
1716
  """Copy file to cluster.
1717

1718
  """
1719
  _OP_REQP = ["nodes", "filename"]
1720

    
1721
  def CheckPrereq(self):
1722
    """Check prerequisites.
1723

1724
    It should check that the named file exists and that the given list
1725
    of nodes is valid.
1726

1727
    """
1728
    if not os.path.exists(self.op.filename):
1729
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1730

    
1731
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1732

    
1733
  def Exec(self, feedback_fn):
1734
    """Copy a file from master to some nodes.
1735

1736
    Args:
1737
      opts - class with options as members
1738
      args - list containing a single element, the file name
1739
    Opts used:
1740
      nodes - list containing the name of target nodes; if empty, all nodes
1741

1742
    """
1743
    filename = self.op.filename
1744

    
1745
    myname = utils.HostInfo().name
1746

    
1747
    for node in self.nodes:
1748
      if node == myname:
1749
        continue
1750
      if not ssh.CopyFileToNode(node, filename):
1751
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1752

    
1753

    
1754
class LUDumpClusterConfig(NoHooksLU):
1755
  """Return a text-representation of the cluster-config.
1756

1757
  """
1758
  _OP_REQP = []
1759

    
1760
  def CheckPrereq(self):
1761
    """No prerequisites.
1762

1763
    """
1764
    pass
1765

    
1766
  def Exec(self, feedback_fn):
1767
    """Dump a representation of the cluster config to the standard output.
1768

1769
    """
1770
    return self.cfg.DumpConfig()
1771

    
1772

    
1773
class LURunClusterCommand(NoHooksLU):
1774
  """Run a command on some nodes.
1775

1776
  """
1777
  _OP_REQP = ["command", "nodes"]
1778

    
1779
  def CheckPrereq(self):
1780
    """Check prerequisites.
1781

1782
    It checks that the given list of nodes is valid.
1783

1784
    """
1785
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1786

    
1787
  def Exec(self, feedback_fn):
1788
    """Run a command on some nodes.
1789

1790
    """
1791
    data = []
1792
    for node in self.nodes:
1793
      result = ssh.SSHCall(node, "root", self.op.command)
1794
      data.append((node, result.output, result.exit_code))
1795

    
1796
    return data
1797

    
1798

    
1799
class LUActivateInstanceDisks(NoHooksLU):
1800
  """Bring up an instance's disks.
1801

1802
  """
1803
  _OP_REQP = ["instance_name"]
1804

    
1805
  def CheckPrereq(self):
1806
    """Check prerequisites.
1807

1808
    This checks that the instance is in the cluster.
1809

1810
    """
1811
    instance = self.cfg.GetInstanceInfo(
1812
      self.cfg.ExpandInstanceName(self.op.instance_name))
1813
    if instance is None:
1814
      raise errors.OpPrereqError("Instance '%s' not known" %
1815
                                 self.op.instance_name)
1816
    self.instance = instance
1817

    
1818

    
1819
  def Exec(self, feedback_fn):
1820
    """Activate the disks.
1821

1822
    """
1823
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1824
    if not disks_ok:
1825
      raise errors.OpExecError("Cannot activate block devices")
1826

    
1827
    return disks_info
1828

    
1829

    
1830
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1831
  """Prepare the block devices for an instance.
1832

1833
  This sets up the block devices on all nodes.
1834

1835
  Args:
1836
    instance: a ganeti.objects.Instance object
1837
    ignore_secondaries: if true, errors on secondary nodes won't result
1838
                        in an error return from the function
1839

1840
  Returns:
1841
    false if the operation failed
1842
    list of (host, instance_visible_name, node_visible_name) if the operation
1843
         suceeded with the mapping from node devices to instance devices
1844
  """
1845
  device_info = []
1846
  disks_ok = True
1847
  for inst_disk in instance.disks:
1848
    master_result = None
1849
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1850
      cfg.SetDiskID(node_disk, node)
1851
      is_primary = node == instance.primary_node
1852
      result = rpc.call_blockdev_assemble(node, node_disk,
1853
                                          instance.name, is_primary)
1854
      if not result:
1855
        logger.Error("could not prepare block device %s on node %s"
1856
                     " (is_primary=%s)" %
1857
                     (inst_disk.iv_name, node, is_primary))
1858
        if is_primary or not ignore_secondaries:
1859
          disks_ok = False
1860
      if is_primary:
1861
        master_result = result
1862
    device_info.append((instance.primary_node, inst_disk.iv_name,
1863
                        master_result))
1864

    
1865
  # leave the disks configured for the primary node
1866
  # this is a workaround that would be fixed better by
1867
  # improving the logical/physical id handling
1868
  for disk in instance.disks:
1869
    cfg.SetDiskID(disk, instance.primary_node)
1870

    
1871
  return disks_ok, device_info
1872

    
1873

    
1874
def _StartInstanceDisks(cfg, instance, force):
1875
  """Start the disks of an instance.
1876

1877
  """
1878
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1879
                                           ignore_secondaries=force)
1880
  if not disks_ok:
1881
    _ShutdownInstanceDisks(instance, cfg)
1882
    if force is not None and not force:
1883
      logger.Error("If the message above refers to a secondary node,"
1884
                   " you can retry the operation using '--force'.")
1885
    raise errors.OpExecError("Disk consistency error")
1886

    
1887

    
1888
class LUDeactivateInstanceDisks(NoHooksLU):
1889
  """Shutdown an instance's disks.
1890

1891
  """
1892
  _OP_REQP = ["instance_name"]
1893

    
1894
  def CheckPrereq(self):
1895
    """Check prerequisites.
1896

1897
    This checks that the instance is in the cluster.
1898

1899
    """
1900
    instance = self.cfg.GetInstanceInfo(
1901
      self.cfg.ExpandInstanceName(self.op.instance_name))
1902
    if instance is None:
1903
      raise errors.OpPrereqError("Instance '%s' not known" %
1904
                                 self.op.instance_name)
1905
    self.instance = instance
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Deactivate the disks
1909

1910
    """
1911
    instance = self.instance
1912
    ins_l = rpc.call_instance_list([instance.primary_node])
1913
    ins_l = ins_l[instance.primary_node]
1914
    if not type(ins_l) is list:
1915
      raise errors.OpExecError("Can't contact node '%s'" %
1916
                               instance.primary_node)
1917

    
1918
    if self.instance.name in ins_l:
1919
      raise errors.OpExecError("Instance is running, can't shutdown"
1920
                               " block devices.")
1921

    
1922
    _ShutdownInstanceDisks(instance, self.cfg)
1923

    
1924

    
1925
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1926
  """Shutdown block devices of an instance.
1927

1928
  This does the shutdown on all nodes of the instance.
1929

1930
  If the ignore_primary is false, errors on the primary node are
1931
  ignored.
1932

1933
  """
1934
  result = True
1935
  for disk in instance.disks:
1936
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1937
      cfg.SetDiskID(top_disk, node)
1938
      if not rpc.call_blockdev_shutdown(node, top_disk):
1939
        logger.Error("could not shutdown block device %s on node %s" %
1940
                     (disk.iv_name, node))
1941
        if not ignore_primary or node != instance.primary_node:
1942
          result = False
1943
  return result
1944

    
1945

    
1946
class LUStartupInstance(LogicalUnit):
1947
  """Starts an instance.
1948

1949
  """
1950
  HPATH = "instance-start"
1951
  HTYPE = constants.HTYPE_INSTANCE
1952
  _OP_REQP = ["instance_name", "force"]
1953

    
1954
  def BuildHooksEnv(self):
1955
    """Build hooks env.
1956

1957
    This runs on master, primary and secondary nodes of the instance.
1958

1959
    """
1960
    env = {
1961
      "FORCE": self.op.force,
1962
      }
1963
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1964
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965
          list(self.instance.secondary_nodes))
1966
    return env, nl, nl
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    This checks that the instance is in the cluster.
1972

1973
    """
1974
    instance = self.cfg.GetInstanceInfo(
1975
      self.cfg.ExpandInstanceName(self.op.instance_name))
1976
    if instance is None:
1977
      raise errors.OpPrereqError("Instance '%s' not known" %
1978
                                 self.op.instance_name)
1979

    
1980
    # check bridges existance
1981
    _CheckInstanceBridgesExist(instance)
1982

    
1983
    self.instance = instance
1984
    self.op.instance_name = instance.name
1985

    
1986
  def Exec(self, feedback_fn):
1987
    """Start the instance.
1988

1989
    """
1990
    instance = self.instance
1991
    force = self.op.force
1992
    extra_args = getattr(self.op, "extra_args", "")
1993

    
1994
    node_current = instance.primary_node
1995

    
1996
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1997
    if not nodeinfo:
1998
      raise errors.OpExecError("Could not contact node %s for infos" %
1999
                               (node_current))
2000

    
2001
    freememory = nodeinfo[node_current]['memory_free']
2002
    memory = instance.memory
2003
    if memory > freememory:
2004
      raise errors.OpExecError("Not enough memory to start instance"
2005
                               " %s on node %s"
2006
                               " needed %s MiB, available %s MiB" %
2007
                               (instance.name, node_current, memory,
2008
                                freememory))
2009

    
2010
    _StartInstanceDisks(self.cfg, instance, force)
2011

    
2012
    if not rpc.call_instance_start(node_current, instance, extra_args):
2013
      _ShutdownInstanceDisks(instance, self.cfg)
2014
      raise errors.OpExecError("Could not start instance")
2015

    
2016
    self.cfg.MarkInstanceUp(instance.name)
2017

    
2018

    
2019
class LURebootInstance(LogicalUnit):
2020
  """Reboot an instance.
2021

2022
  """
2023
  HPATH = "instance-reboot"
2024
  HTYPE = constants.HTYPE_INSTANCE
2025
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2026

    
2027
  def BuildHooksEnv(self):
2028
    """Build hooks env.
2029

2030
    This runs on master, primary and secondary nodes of the instance.
2031

2032
    """
2033
    env = {
2034
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2035
      }
2036
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2037
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038
          list(self.instance.secondary_nodes))
2039
    return env, nl, nl
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks that the instance is in the cluster.
2045

2046
    """
2047
    instance = self.cfg.GetInstanceInfo(
2048
      self.cfg.ExpandInstanceName(self.op.instance_name))
2049
    if instance is None:
2050
      raise errors.OpPrereqError("Instance '%s' not known" %
2051
                                 self.op.instance_name)
2052

    
2053
    # check bridges existance
2054
    _CheckInstanceBridgesExist(instance)
2055

    
2056
    self.instance = instance
2057
    self.op.instance_name = instance.name
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Reboot the instance.
2061

2062
    """
2063
    instance = self.instance
2064
    ignore_secondaries = self.op.ignore_secondaries
2065
    reboot_type = self.op.reboot_type
2066
    extra_args = getattr(self.op, "extra_args", "")
2067

    
2068
    node_current = instance.primary_node
2069

    
2070
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2071
                           constants.INSTANCE_REBOOT_HARD,
2072
                           constants.INSTANCE_REBOOT_FULL]:
2073
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2074
                                  (constants.INSTANCE_REBOOT_SOFT,
2075
                                   constants.INSTANCE_REBOOT_HARD,
2076
                                   constants.INSTANCE_REBOOT_FULL))
2077

    
2078
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2079
                       constants.INSTANCE_REBOOT_HARD]:
2080
      if not rpc.call_instance_reboot(node_current, instance,
2081
                                      reboot_type, extra_args):
2082
        raise errors.OpExecError("Could not reboot instance")
2083
    else:
2084
      if not rpc.call_instance_shutdown(node_current, instance):
2085
        raise errors.OpExecError("could not shutdown instance for full reboot")
2086
      _ShutdownInstanceDisks(instance, self.cfg)
2087
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2088
      if not rpc.call_instance_start(node_current, instance, extra_args):
2089
        _ShutdownInstanceDisks(instance, self.cfg)
2090
        raise errors.OpExecError("Could not start instance for full reboot")
2091

    
2092
    self.cfg.MarkInstanceUp(instance.name)
2093

    
2094

    
2095
class LUShutdownInstance(LogicalUnit):
2096
  """Shutdown an instance.
2097

2098
  """
2099
  HPATH = "instance-stop"
2100
  HTYPE = constants.HTYPE_INSTANCE
2101
  _OP_REQP = ["instance_name"]
2102

    
2103
  def BuildHooksEnv(self):
2104
    """Build hooks env.
2105

2106
    This runs on master, primary and secondary nodes of the instance.
2107

2108
    """
2109
    env = _BuildInstanceHookEnvByObject(self.instance)
2110
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111
          list(self.instance.secondary_nodes))
2112
    return env, nl, nl
2113

    
2114
  def CheckPrereq(self):
2115
    """Check prerequisites.
2116

2117
    This checks that the instance is in the cluster.
2118

2119
    """
2120
    instance = self.cfg.GetInstanceInfo(
2121
      self.cfg.ExpandInstanceName(self.op.instance_name))
2122
    if instance is None:
2123
      raise errors.OpPrereqError("Instance '%s' not known" %
2124
                                 self.op.instance_name)
2125
    self.instance = instance
2126

    
2127
  def Exec(self, feedback_fn):
2128
    """Shutdown the instance.
2129

2130
    """
2131
    instance = self.instance
2132
    node_current = instance.primary_node
2133
    if not rpc.call_instance_shutdown(node_current, instance):
2134
      logger.Error("could not shutdown instance")
2135

    
2136
    self.cfg.MarkInstanceDown(instance.name)
2137
    _ShutdownInstanceDisks(instance, self.cfg)
2138

    
2139

    
2140
class LUReinstallInstance(LogicalUnit):
2141
  """Reinstall an instance.
2142

2143
  """
2144
  HPATH = "instance-reinstall"
2145
  HTYPE = constants.HTYPE_INSTANCE
2146
  _OP_REQP = ["instance_name"]
2147

    
2148
  def BuildHooksEnv(self):
2149
    """Build hooks env.
2150

2151
    This runs on master, primary and secondary nodes of the instance.
2152

2153
    """
2154
    env = _BuildInstanceHookEnvByObject(self.instance)
2155
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2156
          list(self.instance.secondary_nodes))
2157
    return env, nl, nl
2158

    
2159
  def CheckPrereq(self):
2160
    """Check prerequisites.
2161

2162
    This checks that the instance is in the cluster and is not running.
2163

2164
    """
2165
    instance = self.cfg.GetInstanceInfo(
2166
      self.cfg.ExpandInstanceName(self.op.instance_name))
2167
    if instance is None:
2168
      raise errors.OpPrereqError("Instance '%s' not known" %
2169
                                 self.op.instance_name)
2170
    if instance.disk_template == constants.DT_DISKLESS:
2171
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2172
                                 self.op.instance_name)
2173
    if instance.status != "down":
2174
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2175
                                 self.op.instance_name)
2176
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2177
    if remote_info:
2178
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2179
                                 (self.op.instance_name,
2180
                                  instance.primary_node))
2181

    
2182
    self.op.os_type = getattr(self.op, "os_type", None)
2183
    if self.op.os_type is not None:
2184
      # OS verification
2185
      pnode = self.cfg.GetNodeInfo(
2186
        self.cfg.ExpandNodeName(instance.primary_node))
2187
      if pnode is None:
2188
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2189
                                   self.op.pnode)
2190
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2191
      if not os_obj:
2192
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2193
                                   " primary node"  % self.op.os_type)
2194

    
2195
    self.instance = instance
2196

    
2197
  def Exec(self, feedback_fn):
2198
    """Reinstall the instance.
2199

2200
    """
2201
    inst = self.instance
2202

    
2203
    if self.op.os_type is not None:
2204
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2205
      inst.os = self.op.os_type
2206
      self.cfg.AddInstance(inst)
2207

    
2208
    _StartInstanceDisks(self.cfg, inst, None)
2209
    try:
2210
      feedback_fn("Running the instance OS create scripts...")
2211
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2212
        raise errors.OpExecError("Could not install OS for instance %s"
2213
                                 " on node %s" %
2214
                                 (inst.name, inst.primary_node))
2215
    finally:
2216
      _ShutdownInstanceDisks(inst, self.cfg)
2217

    
2218

    
2219
class LURenameInstance(LogicalUnit):
2220
  """Rename an instance.
2221

2222
  """
2223
  HPATH = "instance-rename"
2224
  HTYPE = constants.HTYPE_INSTANCE
2225
  _OP_REQP = ["instance_name", "new_name"]
2226

    
2227
  def BuildHooksEnv(self):
2228
    """Build hooks env.
2229

2230
    This runs on master, primary and secondary nodes of the instance.
2231

2232
    """
2233
    env = _BuildInstanceHookEnvByObject(self.instance)
2234
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2235
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236
          list(self.instance.secondary_nodes))
2237
    return env, nl, nl
2238

    
2239
  def CheckPrereq(self):
2240
    """Check prerequisites.
2241

2242
    This checks that the instance is in the cluster and is not running.
2243

2244
    """
2245
    instance = self.cfg.GetInstanceInfo(
2246
      self.cfg.ExpandInstanceName(self.op.instance_name))
2247
    if instance is None:
2248
      raise errors.OpPrereqError("Instance '%s' not known" %
2249
                                 self.op.instance_name)
2250
    if instance.status != "down":
2251
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2252
                                 self.op.instance_name)
2253
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2254
    if remote_info:
2255
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2256
                                 (self.op.instance_name,
2257
                                  instance.primary_node))
2258
    self.instance = instance
2259

    
2260
    # new name verification
2261
    name_info = utils.HostInfo(self.op.new_name)
2262

    
2263
    self.op.new_name = new_name = name_info.name
2264
    if not getattr(self.op, "ignore_ip", False):
2265
      command = ["fping", "-q", name_info.ip]
2266
      result = utils.RunCmd(command)
2267
      if not result.failed:
2268
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2269
                                   (name_info.ip, new_name))
2270

    
2271

    
2272
  def Exec(self, feedback_fn):
2273
    """Reinstall the instance.
2274

2275
    """
2276
    inst = self.instance
2277
    old_name = inst.name
2278

    
2279
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2280

    
2281
    # re-read the instance from the configuration after rename
2282
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2283

    
2284
    _StartInstanceDisks(self.cfg, inst, None)
2285
    try:
2286
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2287
                                          "sda", "sdb"):
2288
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2289
               " instance has been renamed in Ganeti)" %
2290
               (inst.name, inst.primary_node))
2291
        logger.Error(msg)
2292
    finally:
2293
      _ShutdownInstanceDisks(inst, self.cfg)
2294

    
2295

    
2296
class LURemoveInstance(LogicalUnit):
2297
  """Remove an instance.
2298

2299
  """
2300
  HPATH = "instance-remove"
2301
  HTYPE = constants.HTYPE_INSTANCE
2302
  _OP_REQP = ["instance_name"]
2303

    
2304
  def BuildHooksEnv(self):
2305
    """Build hooks env.
2306

2307
    This runs on master, primary and secondary nodes of the instance.
2308

2309
    """
2310
    env = _BuildInstanceHookEnvByObject(self.instance)
2311
    nl = [self.sstore.GetMasterNode()]
2312
    return env, nl, nl
2313

    
2314
  def CheckPrereq(self):
2315
    """Check prerequisites.
2316

2317
    This checks that the instance is in the cluster.
2318

2319
    """
2320
    instance = self.cfg.GetInstanceInfo(
2321
      self.cfg.ExpandInstanceName(self.op.instance_name))
2322
    if instance is None:
2323
      raise errors.OpPrereqError("Instance '%s' not known" %
2324
                                 self.op.instance_name)
2325
    self.instance = instance
2326

    
2327
  def Exec(self, feedback_fn):
2328
    """Remove the instance.
2329

2330
    """
2331
    instance = self.instance
2332
    logger.Info("shutting down instance %s on node %s" %
2333
                (instance.name, instance.primary_node))
2334

    
2335
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2336
      if self.op.ignore_failures:
2337
        feedback_fn("Warning: can't shutdown instance")
2338
      else:
2339
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2340
                                 (instance.name, instance.primary_node))
2341

    
2342
    logger.Info("removing block devices for instance %s" % instance.name)
2343

    
2344
    if not _RemoveDisks(instance, self.cfg):
2345
      if self.op.ignore_failures:
2346
        feedback_fn("Warning: can't remove instance's disks")
2347
      else:
2348
        raise errors.OpExecError("Can't remove instance's disks")
2349

    
2350
    logger.Info("removing instance %s out of cluster config" % instance.name)
2351

    
2352
    self.cfg.RemoveInstance(instance.name)
2353

    
2354

    
2355
class LUQueryInstances(NoHooksLU):
2356
  """Logical unit for querying instances.
2357

2358
  """
2359
  _OP_REQP = ["output_fields", "names"]
2360

    
2361
  def CheckPrereq(self):
2362
    """Check prerequisites.
2363

2364
    This checks that the fields required are valid output fields.
2365

2366
    """
2367
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2368
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2369
                               "admin_state", "admin_ram",
2370
                               "disk_template", "ip", "mac", "bridge",
2371
                               "sda_size", "sdb_size"],
2372
                       dynamic=self.dynamic_fields,
2373
                       selected=self.op.output_fields)
2374

    
2375
    self.wanted = _GetWantedInstances(self, self.op.names)
2376

    
2377
  def Exec(self, feedback_fn):
2378
    """Computes the list of nodes and their attributes.
2379

2380
    """
2381
    instance_names = self.wanted
2382
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2383
                     in instance_names]
2384

    
2385
    # begin data gathering
2386

    
2387
    nodes = frozenset([inst.primary_node for inst in instance_list])
2388

    
2389
    bad_nodes = []
2390
    if self.dynamic_fields.intersection(self.op.output_fields):
2391
      live_data = {}
2392
      node_data = rpc.call_all_instances_info(nodes)
2393
      for name in nodes:
2394
        result = node_data[name]
2395
        if result:
2396
          live_data.update(result)
2397
        elif result == False:
2398
          bad_nodes.append(name)
2399
        # else no instance is alive
2400
    else:
2401
      live_data = dict([(name, {}) for name in instance_names])
2402

    
2403
    # end data gathering
2404

    
2405
    output = []
2406
    for instance in instance_list:
2407
      iout = []
2408
      for field in self.op.output_fields:
2409
        if field == "name":
2410
          val = instance.name
2411
        elif field == "os":
2412
          val = instance.os
2413
        elif field == "pnode":
2414
          val = instance.primary_node
2415
        elif field == "snodes":
2416
          val = list(instance.secondary_nodes)
2417
        elif field == "admin_state":
2418
          val = (instance.status != "down")
2419
        elif field == "oper_state":
2420
          if instance.primary_node in bad_nodes:
2421
            val = None
2422
          else:
2423
            val = bool(live_data.get(instance.name))
2424
        elif field == "admin_ram":
2425
          val = instance.memory
2426
        elif field == "oper_ram":
2427
          if instance.primary_node in bad_nodes:
2428
            val = None
2429
          elif instance.name in live_data:
2430
            val = live_data[instance.name].get("memory", "?")
2431
          else:
2432
            val = "-"
2433
        elif field == "disk_template":
2434
          val = instance.disk_template
2435
        elif field == "ip":
2436
          val = instance.nics[0].ip
2437
        elif field == "bridge":
2438
          val = instance.nics[0].bridge
2439
        elif field == "mac":
2440
          val = instance.nics[0].mac
2441
        elif field == "sda_size" or field == "sdb_size":
2442
          disk = instance.FindDisk(field[:3])
2443
          if disk is None:
2444
            val = None
2445
          else:
2446
            val = disk.size
2447
        else:
2448
          raise errors.ParameterError(field)
2449
        iout.append(val)
2450
      output.append(iout)
2451

    
2452
    return output
2453

    
2454

    
2455
class LUFailoverInstance(LogicalUnit):
2456
  """Failover an instance.
2457

2458
  """
2459
  HPATH = "instance-failover"
2460
  HTYPE = constants.HTYPE_INSTANCE
2461
  _OP_REQP = ["instance_name", "ignore_consistency"]
2462

    
2463
  def BuildHooksEnv(self):
2464
    """Build hooks env.
2465

2466
    This runs on master, primary and secondary nodes of the instance.
2467

2468
    """
2469
    env = {
2470
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2471
      }
2472
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2473
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2474
    return env, nl, nl
2475

    
2476
  def CheckPrereq(self):
2477
    """Check prerequisites.
2478

2479
    This checks that the instance is in the cluster.
2480

2481
    """
2482
    instance = self.cfg.GetInstanceInfo(
2483
      self.cfg.ExpandInstanceName(self.op.instance_name))
2484
    if instance is None:
2485
      raise errors.OpPrereqError("Instance '%s' not known" %
2486
                                 self.op.instance_name)
2487

    
2488
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2489
      raise errors.OpPrereqError("Instance's disk layout is not"
2490
                                 " network mirrored, cannot failover.")
2491

    
2492
    secondary_nodes = instance.secondary_nodes
2493
    if not secondary_nodes:
2494
      raise errors.ProgrammerError("no secondary node but using "
2495
                                   "DT_REMOTE_RAID1 template")
2496

    
2497
    # check memory requirements on the secondary node
2498
    target_node = secondary_nodes[0]
2499
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2500
    info = nodeinfo.get(target_node, None)
2501
    if not info:
2502
      raise errors.OpPrereqError("Cannot get current information"
2503
                                 " from node '%s'" % nodeinfo)
2504
    if instance.memory > info['memory_free']:
2505
      raise errors.OpPrereqError("Not enough memory on target node %s."
2506
                                 " %d MB available, %d MB required" %
2507
                                 (target_node, info['memory_free'],
2508
                                  instance.memory))
2509

    
2510
    # check bridge existance
2511
    brlist = [nic.bridge for nic in instance.nics]
2512
    if not rpc.call_bridges_exist(target_node, brlist):
2513
      raise errors.OpPrereqError("One or more target bridges %s does not"
2514
                                 " exist on destination node '%s'" %
2515
                                 (brlist, target_node))
2516

    
2517
    self.instance = instance
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Failover an instance.
2521

2522
    The failover is done by shutting it down on its present node and
2523
    starting it on the secondary.
2524

2525
    """
2526
    instance = self.instance
2527

    
2528
    source_node = instance.primary_node
2529
    target_node = instance.secondary_nodes[0]
2530

    
2531
    feedback_fn("* checking disk consistency between source and target")
2532
    for dev in instance.disks:
2533
      # for remote_raid1, these are md over drbd
2534
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2535
        if not self.op.ignore_consistency:
2536
          raise errors.OpExecError("Disk %s is degraded on target node,"
2537
                                   " aborting failover." % dev.iv_name)
2538

    
2539
    feedback_fn("* checking target node resource availability")
2540
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2541

    
2542
    if not nodeinfo:
2543
      raise errors.OpExecError("Could not contact target node %s." %
2544
                               target_node)
2545

    
2546
    free_memory = int(nodeinfo[target_node]['memory_free'])
2547
    memory = instance.memory
2548
    if memory > free_memory:
2549
      raise errors.OpExecError("Not enough memory to create instance %s on"
2550
                               " node %s. needed %s MiB, available %s MiB" %
2551
                               (instance.name, target_node, memory,
2552
                                free_memory))
2553

    
2554
    feedback_fn("* shutting down instance on source node")
2555
    logger.Info("Shutting down instance %s on node %s" %
2556
                (instance.name, source_node))
2557

    
2558
    if not rpc.call_instance_shutdown(source_node, instance):
2559
      if self.op.ignore_consistency:
2560
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2561
                     " anyway. Please make sure node %s is down"  %
2562
                     (instance.name, source_node, source_node))
2563
      else:
2564
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2565
                                 (instance.name, source_node))
2566

    
2567
    feedback_fn("* deactivating the instance's disks on source node")
2568
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2569
      raise errors.OpExecError("Can't shut down the instance's disks.")
2570

    
2571
    instance.primary_node = target_node
2572
    # distribute new instance config to the other nodes
2573
    self.cfg.AddInstance(instance)
2574

    
2575
    feedback_fn("* activating the instance's disks on target node")
2576
    logger.Info("Starting instance %s on node %s" %
2577
                (instance.name, target_node))
2578

    
2579
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2580
                                             ignore_secondaries=True)
2581
    if not disks_ok:
2582
      _ShutdownInstanceDisks(instance, self.cfg)
2583
      raise errors.OpExecError("Can't activate the instance's disks")
2584

    
2585
    feedback_fn("* starting the instance on the target node")
2586
    if not rpc.call_instance_start(target_node, instance, None):
2587
      _ShutdownInstanceDisks(instance, self.cfg)
2588
      raise errors.OpExecError("Could not start instance %s on node %s." %
2589
                               (instance.name, target_node))
2590

    
2591

    
2592
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2593
  """Create a tree of block devices on the primary node.
2594

2595
  This always creates all devices.
2596

2597
  """
2598
  if device.children:
2599
    for child in device.children:
2600
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2601
        return False
2602

    
2603
  cfg.SetDiskID(device, node)
2604
  new_id = rpc.call_blockdev_create(node, device, device.size,
2605
                                    instance.name, True, info)
2606
  if not new_id:
2607
    return False
2608
  if device.physical_id is None:
2609
    device.physical_id = new_id
2610
  return True
2611

    
2612

    
2613
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2614
  """Create a tree of block devices on a secondary node.
2615

2616
  If this device type has to be created on secondaries, create it and
2617
  all its children.
2618

2619
  If not, just recurse to children keeping the same 'force' value.
2620

2621
  """
2622
  if device.CreateOnSecondary():
2623
    force = True
2624
  if device.children:
2625
    for child in device.children:
2626
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2627
                                        child, force, info):
2628
        return False
2629

    
2630
  if not force:
2631
    return True
2632
  cfg.SetDiskID(device, node)
2633
  new_id = rpc.call_blockdev_create(node, device, device.size,
2634
                                    instance.name, False, info)
2635
  if not new_id:
2636
    return False
2637
  if device.physical_id is None:
2638
    device.physical_id = new_id
2639
  return True
2640

    
2641

    
2642
def _GenerateUniqueNames(cfg, exts):
2643
  """Generate a suitable LV name.
2644

2645
  This will generate a logical volume name for the given instance.
2646

2647
  """
2648
  results = []
2649
  for val in exts:
2650
    new_id = cfg.GenerateUniqueID()
2651
    results.append("%s%s" % (new_id, val))
2652
  return results
2653

    
2654

    
2655
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2656
  """Generate a drbd device complete with its children.
2657

2658
  """
2659
  port = cfg.AllocatePort()
2660
  vgname = cfg.GetVGName()
2661
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662
                          logical_id=(vgname, names[0]))
2663
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664
                          logical_id=(vgname, names[1]))
2665
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2666
                          logical_id = (primary, secondary, port),
2667
                          children = [dev_data, dev_meta])
2668
  return drbd_dev
2669

    
2670

    
2671
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2672
  """Generate a drbd8 device complete with its children.
2673

2674
  """
2675
  port = cfg.AllocatePort()
2676
  vgname = cfg.GetVGName()
2677
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2678
                          logical_id=(vgname, names[0]))
2679
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2680
                          logical_id=(vgname, names[1]))
2681
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2682
                          logical_id = (primary, secondary, port),
2683
                          children = [dev_data, dev_meta],
2684
                          iv_name=iv_name)
2685
  return drbd_dev
2686

    
2687
def _GenerateDiskTemplate(cfg, template_name,
2688
                          instance_name, primary_node,
2689
                          secondary_nodes, disk_sz, swap_sz):
2690
  """Generate the entire disk layout for a given template type.
2691

2692
  """
2693
  #TODO: compute space requirements
2694

    
2695
  vgname = cfg.GetVGName()
2696
  if template_name == "diskless":
2697
    disks = []
2698
  elif template_name == "plain":
2699
    if len(secondary_nodes) != 0:
2700
      raise errors.ProgrammerError("Wrong template configuration")
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704
                           logical_id=(vgname, names[0]),
2705
                           iv_name = "sda")
2706
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707
                           logical_id=(vgname, names[1]),
2708
                           iv_name = "sdb")
2709
    disks = [sda_dev, sdb_dev]
2710
  elif template_name == "local_raid1":
2711
    if len(secondary_nodes) != 0:
2712
      raise errors.ProgrammerError("Wrong template configuration")
2713

    
2714

    
2715
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2716
                                       ".sdb_m1", ".sdb_m2"])
2717
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718
                              logical_id=(vgname, names[0]))
2719
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2720
                              logical_id=(vgname, names[1]))
2721
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2722
                              size=disk_sz,
2723
                              children = [sda_dev_m1, sda_dev_m2])
2724
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2725
                              logical_id=(vgname, names[2]))
2726
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2727
                              logical_id=(vgname, names[3]))
2728
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2729
                              size=swap_sz,
2730
                              children = [sdb_dev_m1, sdb_dev_m2])
2731
    disks = [md_sda_dev, md_sdb_dev]
2732
  elif template_name == constants.DT_REMOTE_RAID1:
2733
    if len(secondary_nodes) != 1:
2734
      raise errors.ProgrammerError("Wrong template configuration")
2735
    remote_node = secondary_nodes[0]
2736
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2737
                                       ".sdb_data", ".sdb_meta"])
2738
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2739
                                         disk_sz, names[0:2])
2740
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2741
                              children = [drbd_sda_dev], size=disk_sz)
2742
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2743
                                         swap_sz, names[2:4])
2744
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2745
                              children = [drbd_sdb_dev], size=swap_sz)
2746
    disks = [md_sda_dev, md_sdb_dev]
2747
  elif template_name == constants.DT_DRBD8:
2748
    if len(secondary_nodes) != 1:
2749
      raise errors.ProgrammerError("Wrong template configuration")
2750
    remote_node = secondary_nodes[0]
2751
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752
                                       ".sdb_data", ".sdb_meta"])
2753
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2754
                                         disk_sz, names[0:2], "sda")
2755
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2756
                                         swap_sz, names[2:4], "sdb")
2757
    disks = [drbd_sda_dev, drbd_sdb_dev]
2758
  else:
2759
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2760
  return disks
2761

    
2762

    
2763
def _GetInstanceInfoText(instance):
2764
  """Compute that text that should be added to the disk's metadata.
2765

2766
  """
2767
  return "originstname+%s" % instance.name
2768

    
2769

    
2770
def _CreateDisks(cfg, instance):
2771
  """Create all disks for an instance.
2772

2773
  This abstracts away some work from AddInstance.
2774

2775
  Args:
2776
    instance: the instance object
2777

2778
  Returns:
2779
    True or False showing the success of the creation process
2780

2781
  """
2782
  info = _GetInstanceInfoText(instance)
2783

    
2784
  for device in instance.disks:
2785
    logger.Info("creating volume %s for instance %s" %
2786
              (device.iv_name, instance.name))
2787
    #HARDCODE
2788
    for secondary_node in instance.secondary_nodes:
2789
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2790
                                        device, False, info):
2791
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2792
                     (device.iv_name, device, secondary_node))
2793
        return False
2794
    #HARDCODE
2795
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2796
                                    instance, device, info):
2797
      logger.Error("failed to create volume %s on primary!" %
2798
                   device.iv_name)
2799
      return False
2800
  return True
2801

    
2802

    
2803
def _RemoveDisks(instance, cfg):
2804
  """Remove all disks for an instance.
2805

2806
  This abstracts away some work from `AddInstance()` and
2807
  `RemoveInstance()`. Note that in case some of the devices couldn't
2808
  be removed, the removal will continue with the other ones (compare
2809
  with `_CreateDisks()`).
2810

2811
  Args:
2812
    instance: the instance object
2813

2814
  Returns:
2815
    True or False showing the success of the removal proces
2816

2817
  """
2818
  logger.Info("removing block devices for instance %s" % instance.name)
2819

    
2820
  result = True
2821
  for device in instance.disks:
2822
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2823
      cfg.SetDiskID(disk, node)
2824
      if not rpc.call_blockdev_remove(node, disk):
2825
        logger.Error("could not remove block device %s on node %s,"
2826
                     " continuing anyway" %
2827
                     (device.iv_name, node))
2828
        result = False
2829
  return result
2830

    
2831

    
2832
class LUCreateInstance(LogicalUnit):
2833
  """Create an instance.
2834

2835
  """
2836
  HPATH = "instance-add"
2837
  HTYPE = constants.HTYPE_INSTANCE
2838
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2839
              "disk_template", "swap_size", "mode", "start", "vcpus",
2840
              "wait_for_sync", "ip_check"]
2841

    
2842
  def BuildHooksEnv(self):
2843
    """Build hooks env.
2844

2845
    This runs on master, primary and secondary nodes of the instance.
2846

2847
    """
2848
    env = {
2849
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2850
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2851
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2852
      "INSTANCE_ADD_MODE": self.op.mode,
2853
      }
2854
    if self.op.mode == constants.INSTANCE_IMPORT:
2855
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2856
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2857
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2858

    
2859
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2860
      primary_node=self.op.pnode,
2861
      secondary_nodes=self.secondaries,
2862
      status=self.instance_status,
2863
      os_type=self.op.os_type,
2864
      memory=self.op.mem_size,
2865
      vcpus=self.op.vcpus,
2866
      nics=[(self.inst_ip, self.op.bridge)],
2867
    ))
2868

    
2869
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2870
          self.secondaries)
2871
    return env, nl, nl
2872

    
2873

    
2874
  def CheckPrereq(self):
2875
    """Check prerequisites.
2876

2877
    """
2878
    if self.op.mode not in (constants.INSTANCE_CREATE,
2879
                            constants.INSTANCE_IMPORT):
2880
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2881
                                 self.op.mode)
2882

    
2883
    if self.op.mode == constants.INSTANCE_IMPORT:
2884
      src_node = getattr(self.op, "src_node", None)
2885
      src_path = getattr(self.op, "src_path", None)
2886
      if src_node is None or src_path is None:
2887
        raise errors.OpPrereqError("Importing an instance requires source"
2888
                                   " node and path options")
2889
      src_node_full = self.cfg.ExpandNodeName(src_node)
2890
      if src_node_full is None:
2891
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2892
      self.op.src_node = src_node = src_node_full
2893

    
2894
      if not os.path.isabs(src_path):
2895
        raise errors.OpPrereqError("The source path must be absolute")
2896

    
2897
      export_info = rpc.call_export_info(src_node, src_path)
2898

    
2899
      if not export_info:
2900
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2901

    
2902
      if not export_info.has_section(constants.INISECT_EXP):
2903
        raise errors.ProgrammerError("Corrupted export config")
2904

    
2905
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2906
      if (int(ei_version) != constants.EXPORT_VERSION):
2907
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2908
                                   (ei_version, constants.EXPORT_VERSION))
2909

    
2910
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2911
        raise errors.OpPrereqError("Can't import instance with more than"
2912
                                   " one data disk")
2913

    
2914
      # FIXME: are the old os-es, disk sizes, etc. useful?
2915
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2916
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2917
                                                         'disk0_dump'))
2918
      self.src_image = diskimage
2919
    else: # INSTANCE_CREATE
2920
      if getattr(self.op, "os_type", None) is None:
2921
        raise errors.OpPrereqError("No guest OS specified")
2922

    
2923
    # check primary node
2924
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2925
    if pnode is None:
2926
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2927
                                 self.op.pnode)
2928
    self.op.pnode = pnode.name
2929
    self.pnode = pnode
2930
    self.secondaries = []
2931
    # disk template and mirror node verification
2932
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2933
      raise errors.OpPrereqError("Invalid disk template name")
2934

    
2935
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2936
      if getattr(self.op, "snode", None) is None:
2937
        raise errors.OpPrereqError("The networked disk templates need"
2938
                                   " a mirror node")
2939

    
2940
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2941
      if snode_name is None:
2942
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2943
                                   self.op.snode)
2944
      elif snode_name == pnode.name:
2945
        raise errors.OpPrereqError("The secondary node cannot be"
2946
                                   " the primary node.")
2947
      self.secondaries.append(snode_name)
2948

    
2949
    # Check lv size requirements
2950
    nodenames = [pnode.name] + self.secondaries
2951
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2952

    
2953
    # Required free disk space as a function of disk and swap space
2954
    req_size_dict = {
2955
      constants.DT_DISKLESS: 0,
2956
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2957
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2958
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2959
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2960
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2961
    }
2962

    
2963
    if self.op.disk_template not in req_size_dict:
2964
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2965
                                   " is unknown" %  self.op.disk_template)
2966

    
2967
    req_size = req_size_dict[self.op.disk_template]
2968

    
2969
    for node in nodenames:
2970
      info = nodeinfo.get(node, None)
2971
      if not info:
2972
        raise errors.OpPrereqError("Cannot get current information"
2973
                                   " from node '%s'" % nodeinfo)
2974
      if req_size > info['vg_free']:
2975
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2976
                                   " %d MB available, %d MB required" %
2977
                                   (node, info['vg_free'], req_size))
2978

    
2979
    # os verification
2980
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2981
    if not os_obj:
2982
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2983
                                 " primary node"  % self.op.os_type)
2984

    
2985
    # instance verification
2986
    hostname1 = utils.HostInfo(self.op.instance_name)
2987

    
2988
    self.op.instance_name = instance_name = hostname1.name
2989
    instance_list = self.cfg.GetInstanceList()
2990
    if instance_name in instance_list:
2991
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2992
                                 instance_name)
2993

    
2994
    ip = getattr(self.op, "ip", None)
2995
    if ip is None or ip.lower() == "none":
2996
      inst_ip = None
2997
    elif ip.lower() == "auto":
2998
      inst_ip = hostname1.ip
2999
    else:
3000
      if not utils.IsValidIP(ip):
3001
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3002
                                   " like a valid IP" % ip)
3003
      inst_ip = ip
3004
    self.inst_ip = inst_ip
3005

    
3006
    if self.op.start and not self.op.ip_check:
3007
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3008
                                 " adding an instance in start mode")
3009

    
3010
    if self.op.ip_check:
3011
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3012
                       constants.DEFAULT_NODED_PORT):
3013
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3014
                                   (hostname1.ip, instance_name))
3015

    
3016
    # bridge verification
3017
    bridge = getattr(self.op, "bridge", None)
3018
    if bridge is None:
3019
      self.op.bridge = self.cfg.GetDefBridge()
3020
    else:
3021
      self.op.bridge = bridge
3022

    
3023
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3024
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3025
                                 " destination node '%s'" %
3026
                                 (self.op.bridge, pnode.name))
3027

    
3028
    if self.op.start:
3029
      self.instance_status = 'up'
3030
    else:
3031
      self.instance_status = 'down'
3032

    
3033
  def Exec(self, feedback_fn):
3034
    """Create and add the instance to the cluster.
3035

3036
    """
3037
    instance = self.op.instance_name
3038
    pnode_name = self.pnode.name
3039

    
3040
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3041
    if self.inst_ip is not None:
3042
      nic.ip = self.inst_ip
3043

    
3044
    ht_kind = self.sstore.GetHypervisorType()
3045
    if ht_kind in constants.HTS_REQ_PORT:
3046
      network_port = self.cfg.AllocatePort()
3047
    else:
3048
      network_port = None
3049

    
3050
    disks = _GenerateDiskTemplate(self.cfg,
3051
                                  self.op.disk_template,
3052
                                  instance, pnode_name,
3053
                                  self.secondaries, self.op.disk_size,
3054
                                  self.op.swap_size)
3055

    
3056
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3057
                            primary_node=pnode_name,
3058
                            memory=self.op.mem_size,
3059
                            vcpus=self.op.vcpus,
3060
                            nics=[nic], disks=disks,
3061
                            disk_template=self.op.disk_template,
3062
                            status=self.instance_status,
3063
                            network_port=network_port,
3064
                            )
3065

    
3066
    feedback_fn("* creating instance disks...")
3067
    if not _CreateDisks(self.cfg, iobj):
3068
      _RemoveDisks(iobj, self.cfg)
3069
      raise errors.OpExecError("Device creation failed, reverting...")
3070

    
3071
    feedback_fn("adding instance %s to cluster config" % instance)
3072

    
3073
    self.cfg.AddInstance(iobj)
3074

    
3075
    if self.op.wait_for_sync:
3076
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3077
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3078
      # make sure the disks are not degraded (still sync-ing is ok)
3079
      time.sleep(15)
3080
      feedback_fn("* checking mirrors status")
3081
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3082
    else:
3083
      disk_abort = False
3084

    
3085
    if disk_abort:
3086
      _RemoveDisks(iobj, self.cfg)
3087
      self.cfg.RemoveInstance(iobj.name)
3088
      raise errors.OpExecError("There are some degraded disks for"
3089
                               " this instance")
3090

    
3091
    feedback_fn("creating os for instance %s on node %s" %
3092
                (instance, pnode_name))
3093

    
3094
    if iobj.disk_template != constants.DT_DISKLESS:
3095
      if self.op.mode == constants.INSTANCE_CREATE:
3096
        feedback_fn("* running the instance OS create scripts...")
3097
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3098
          raise errors.OpExecError("could not add os for instance %s"
3099
                                   " on node %s" %
3100
                                   (instance, pnode_name))
3101

    
3102
      elif self.op.mode == constants.INSTANCE_IMPORT:
3103
        feedback_fn("* running the instance OS import scripts...")
3104
        src_node = self.op.src_node
3105
        src_image = self.src_image
3106
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3107
                                                src_node, src_image):
3108
          raise errors.OpExecError("Could not import os for instance"
3109
                                   " %s on node %s" %
3110
                                   (instance, pnode_name))
3111
      else:
3112
        # also checked in the prereq part
3113
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3114
                                     % self.op.mode)
3115

    
3116
    if self.op.start:
3117
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3118
      feedback_fn("* starting instance...")
3119
      if not rpc.call_instance_start(pnode_name, iobj, None):
3120
        raise errors.OpExecError("Could not start instance")
3121

    
3122

    
3123
class LUConnectConsole(NoHooksLU):
3124
  """Connect to an instance's console.
3125

3126
  This is somewhat special in that it returns the command line that
3127
  you need to run on the master node in order to connect to the
3128
  console.
3129

3130
  """
3131
  _OP_REQP = ["instance_name"]
3132

    
3133
  def CheckPrereq(self):
3134
    """Check prerequisites.
3135

3136
    This checks that the instance is in the cluster.
3137

3138
    """
3139
    instance = self.cfg.GetInstanceInfo(
3140
      self.cfg.ExpandInstanceName(self.op.instance_name))
3141
    if instance is None:
3142
      raise errors.OpPrereqError("Instance '%s' not known" %
3143
                                 self.op.instance_name)
3144
    self.instance = instance
3145

    
3146
  def Exec(self, feedback_fn):
3147
    """Connect to the console of an instance
3148

3149
    """
3150
    instance = self.instance
3151
    node = instance.primary_node
3152

    
3153
    node_insts = rpc.call_instance_list([node])[node]
3154
    if node_insts is False:
3155
      raise errors.OpExecError("Can't connect to node %s." % node)
3156

    
3157
    if instance.name not in node_insts:
3158
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3159

    
3160
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3161

    
3162
    hyper = hypervisor.GetHypervisor()
3163
    console_cmd = hyper.GetShellCommandForConsole(instance)
3164
    # build ssh cmdline
3165
    argv = ["ssh", "-q", "-t"]
3166
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3167
    argv.extend(ssh.BATCH_MODE_OPTS)
3168
    argv.append(node)
3169
    argv.append(console_cmd)
3170
    return "ssh", argv
3171

    
3172

    
3173
class LUAddMDDRBDComponent(LogicalUnit):
3174
  """Adda new mirror member to an instance's disk.
3175

3176
  """
3177
  HPATH = "mirror-add"
3178
  HTYPE = constants.HTYPE_INSTANCE
3179
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3180

    
3181
  def BuildHooksEnv(self):
3182
    """Build hooks env.
3183

3184
    This runs on the master, the primary and all the secondaries.
3185

3186
    """
3187
    env = {
3188
      "NEW_SECONDARY": self.op.remote_node,
3189
      "DISK_NAME": self.op.disk_name,
3190
      }
3191
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3192
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3193
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3194
    return env, nl, nl
3195

    
3196
  def CheckPrereq(self):
3197
    """Check prerequisites.
3198

3199
    This checks that the instance is in the cluster.
3200

3201
    """
3202
    instance = self.cfg.GetInstanceInfo(
3203
      self.cfg.ExpandInstanceName(self.op.instance_name))
3204
    if instance is None:
3205
      raise errors.OpPrereqError("Instance '%s' not known" %
3206
                                 self.op.instance_name)
3207
    self.instance = instance
3208

    
3209
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3210
    if remote_node is None:
3211
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3212
    self.remote_node = remote_node
3213

    
3214
    if remote_node == instance.primary_node:
3215
      raise errors.OpPrereqError("The specified node is the primary node of"
3216
                                 " the instance.")
3217

    
3218
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3219
      raise errors.OpPrereqError("Instance's disk layout is not"
3220
                                 " remote_raid1.")
3221
    for disk in instance.disks:
3222
      if disk.iv_name == self.op.disk_name:
3223
        break
3224
    else:
3225
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3226
                                 " instance." % self.op.disk_name)
3227
    if len(disk.children) > 1:
3228
      raise errors.OpPrereqError("The device already has two slave devices."
3229
                                 " This would create a 3-disk raid1 which we"
3230
                                 " don't allow.")
3231
    self.disk = disk
3232

    
3233
  def Exec(self, feedback_fn):
3234
    """Add the mirror component
3235

3236
    """
3237
    disk = self.disk
3238
    instance = self.instance
3239

    
3240
    remote_node = self.remote_node
3241
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3242
    names = _GenerateUniqueNames(self.cfg, lv_names)
3243
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3244
                                     remote_node, disk.size, names)
3245

    
3246
    logger.Info("adding new mirror component on secondary")
3247
    #HARDCODE
3248
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3249
                                      new_drbd, False,
3250
                                      _GetInstanceInfoText(instance)):
3251
      raise errors.OpExecError("Failed to create new component on secondary"
3252
                               " node %s" % remote_node)
3253

    
3254
    logger.Info("adding new mirror component on primary")
3255
    #HARDCODE
3256
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3257
                                    instance, new_drbd,
3258
                                    _GetInstanceInfoText(instance)):
3259
      # remove secondary dev
3260
      self.cfg.SetDiskID(new_drbd, remote_node)
3261
      rpc.call_blockdev_remove(remote_node, new_drbd)
3262
      raise errors.OpExecError("Failed to create volume on primary")
3263

    
3264
    # the device exists now
3265
    # call the primary node to add the mirror to md
3266
    logger.Info("adding new mirror component to md")
3267
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3268
                                         disk, [new_drbd]):
3269
      logger.Error("Can't add mirror compoment to md!")
3270
      self.cfg.SetDiskID(new_drbd, remote_node)
3271
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3272
        logger.Error("Can't rollback on secondary")
3273
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3274
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3275
        logger.Error("Can't rollback on primary")
3276
      raise errors.OpExecError("Can't add mirror component to md array")
3277

    
3278
    disk.children.append(new_drbd)
3279

    
3280
    self.cfg.AddInstance(instance)
3281

    
3282
    _WaitForSync(self.cfg, instance, self.proc)
3283

    
3284
    return 0
3285

    
3286

    
3287
class LURemoveMDDRBDComponent(LogicalUnit):
3288
  """Remove a component from a remote_raid1 disk.
3289

3290
  """
3291
  HPATH = "mirror-remove"
3292
  HTYPE = constants.HTYPE_INSTANCE
3293
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3294

    
3295
  def BuildHooksEnv(self):
3296
    """Build hooks env.
3297

3298
    This runs on the master, the primary and all the secondaries.
3299

3300
    """
3301
    env = {
3302
      "DISK_NAME": self.op.disk_name,
3303
      "DISK_ID": self.op.disk_id,
3304
      "OLD_SECONDARY": self.old_secondary,
3305
      }
3306
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3307
    nl = [self.sstore.GetMasterNode(),
3308
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3309
    return env, nl, nl
3310

    
3311
  def CheckPrereq(self):
3312
    """Check prerequisites.
3313

3314
    This checks that the instance is in the cluster.
3315

3316
    """
3317
    instance = self.cfg.GetInstanceInfo(
3318
      self.cfg.ExpandInstanceName(self.op.instance_name))
3319
    if instance is None:
3320
      raise errors.OpPrereqError("Instance '%s' not known" %
3321
                                 self.op.instance_name)
3322
    self.instance = instance
3323

    
3324
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3325
      raise errors.OpPrereqError("Instance's disk layout is not"
3326
                                 " remote_raid1.")
3327
    for disk in instance.disks:
3328
      if disk.iv_name == self.op.disk_name:
3329
        break
3330
    else:
3331
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3332
                                 " instance." % self.op.disk_name)
3333
    for child in disk.children:
3334
      if (child.dev_type == constants.LD_DRBD7 and
3335
          child.logical_id[2] == self.op.disk_id):
3336
        break
3337
    else:
3338
      raise errors.OpPrereqError("Can't find the device with this port.")
3339

    
3340
    if len(disk.children) < 2:
3341
      raise errors.OpPrereqError("Cannot remove the last component from"
3342
                                 " a mirror.")
3343
    self.disk = disk
3344
    self.child = child
3345
    if self.child.logical_id[0] == instance.primary_node:
3346
      oid = 1
3347
    else:
3348
      oid = 0
3349
    self.old_secondary = self.child.logical_id[oid]
3350

    
3351
  def Exec(self, feedback_fn):
3352
    """Remove the mirror component
3353

3354
    """
3355
    instance = self.instance
3356
    disk = self.disk
3357
    child = self.child
3358
    logger.Info("remove mirror component")
3359
    self.cfg.SetDiskID(disk, instance.primary_node)
3360
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3361
                                            disk, [child]):
3362
      raise errors.OpExecError("Can't remove child from mirror.")
3363

    
3364
    for node in child.logical_id[:2]:
3365
      self.cfg.SetDiskID(child, node)
3366
      if not rpc.call_blockdev_remove(node, child):
3367
        logger.Error("Warning: failed to remove device from node %s,"
3368
                     " continuing operation." % node)
3369

    
3370
    disk.children.remove(child)
3371
    self.cfg.AddInstance(instance)
3372

    
3373

    
3374
class LUReplaceDisks(LogicalUnit):
3375
  """Replace the disks of an instance.
3376

3377
  """
3378
  HPATH = "mirrors-replace"
3379
  HTYPE = constants.HTYPE_INSTANCE
3380
  _OP_REQP = ["instance_name", "mode", "disks"]
3381

    
3382
  def BuildHooksEnv(self):
3383
    """Build hooks env.
3384

3385
    This runs on the master, the primary and all the secondaries.
3386

3387
    """
3388
    env = {
3389
      "MODE": self.op.mode,
3390
      "NEW_SECONDARY": self.op.remote_node,
3391
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3392
      }
3393
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3394
    nl = [
3395
      self.sstore.GetMasterNode(),
3396
      self.instance.primary_node,
3397
      ]
3398
    if self.op.remote_node is not None:
3399
      nl.append(self.op.remote_node)
3400
    return env, nl, nl
3401

    
3402
  def CheckPrereq(self):
3403
    """Check prerequisites.
3404

3405
    This checks that the instance is in the cluster.
3406

3407
    """
3408
    instance = self.cfg.GetInstanceInfo(
3409
      self.cfg.ExpandInstanceName(self.op.instance_name))
3410
    if instance is None:
3411
      raise errors.OpPrereqError("Instance '%s' not known" %
3412
                                 self.op.instance_name)
3413
    self.instance = instance
3414
    self.op.instance_name = instance.name
3415

    
3416
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3417
      raise errors.OpPrereqError("Instance's disk layout is not"
3418
                                 " network mirrored.")
3419

    
3420
    if len(instance.secondary_nodes) != 1:
3421
      raise errors.OpPrereqError("The instance has a strange layout,"
3422
                                 " expected one secondary but found %d" %
3423
                                 len(instance.secondary_nodes))
3424

    
3425
    self.sec_node = instance.secondary_nodes[0]
3426

    
3427
    remote_node = getattr(self.op, "remote_node", None)
3428
    if remote_node is not None:
3429
      remote_node = self.cfg.ExpandNodeName(remote_node)
3430
      if remote_node is None:
3431
        raise errors.OpPrereqError("Node '%s' not known" %
3432
                                   self.op.remote_node)
3433
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3434
    else:
3435
      self.remote_node_info = None
3436
    if remote_node == instance.primary_node:
3437
      raise errors.OpPrereqError("The specified node is the primary node of"
3438
                                 " the instance.")
3439
    elif remote_node == self.sec_node:
3440
      if self.op.mode == constants.REPLACE_DISK_SEC:
3441
        # this is for DRBD8, where we can't execute the same mode of
3442
        # replacement as for drbd7 (no different port allocated)
3443
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3444
                                   " replacement")
3445
      # the user gave the current secondary, switch to
3446
      # 'no-replace-secondary' mode for drbd7
3447
      remote_node = None
3448
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3449
        self.op.mode != constants.REPLACE_DISK_ALL):
3450
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3451
                                 " disks replacement, not individual ones")
3452
    if instance.disk_template == constants.DT_DRBD8:
3453
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3454
          remote_node is not None):
3455
        # switch to replace secondary mode
3456
        self.op.mode = constants.REPLACE_DISK_SEC
3457

    
3458
      if self.op.mode == constants.REPLACE_DISK_ALL:
3459
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3460
                                   " secondary disk replacement, not"
3461
                                   " both at once")
3462
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3463
        if remote_node is not None:
3464
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3465
                                     " the secondary while doing a primary"
3466
                                     " node disk replacement")
3467
        self.tgt_node = instance.primary_node
3468
        self.oth_node = instance.secondary_nodes[0]
3469
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3470
        self.new_node = remote_node # this can be None, in which case
3471
                                    # we don't change the secondary
3472
        self.tgt_node = instance.secondary_nodes[0]
3473
        self.oth_node = instance.primary_node
3474
      else:
3475
        raise errors.ProgrammerError("Unhandled disk replace mode")
3476

    
3477
    for name in self.op.disks:
3478
      if instance.FindDisk(name) is None:
3479
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3480
                                   (name, instance.name))
3481
    self.op.remote_node = remote_node
3482

    
3483
  def _ExecRR1(self, feedback_fn):
3484
    """Replace the disks of an instance.
3485

3486
    """
3487
    instance = self.instance
3488
    iv_names = {}
3489
    # start of work
3490
    if self.op.remote_node is None:
3491
      remote_node = self.sec_node
3492
    else:
3493
      remote_node = self.op.remote_node
3494
    cfg = self.cfg
3495
    for dev in instance.disks:
3496
      size = dev.size
3497
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3498
      names = _GenerateUniqueNames(cfg, lv_names)
3499
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3500
                                       remote_node, size, names)
3501
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3502
      logger.Info("adding new mirror component on secondary for %s" %
3503
                  dev.iv_name)
3504
      #HARDCODE
3505
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3506
                                        new_drbd, False,
3507
                                        _GetInstanceInfoText(instance)):
3508
        raise errors.OpExecError("Failed to create new component on secondary"
3509
                                 " node %s. Full abort, cleanup manually!" %
3510
                                 remote_node)
3511

    
3512
      logger.Info("adding new mirror component on primary")
3513
      #HARDCODE
3514
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3515
                                      instance, new_drbd,
3516
                                      _GetInstanceInfoText(instance)):
3517
        # remove secondary dev
3518
        cfg.SetDiskID(new_drbd, remote_node)
3519
        rpc.call_blockdev_remove(remote_node, new_drbd)
3520
        raise errors.OpExecError("Failed to create volume on primary!"
3521
                                 " Full abort, cleanup manually!!")
3522

    
3523
      # the device exists now
3524
      # call the primary node to add the mirror to md
3525
      logger.Info("adding new mirror component to md")
3526
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3527
                                           [new_drbd]):
3528
        logger.Error("Can't add mirror compoment to md!")
3529
        cfg.SetDiskID(new_drbd, remote_node)
3530
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3531
          logger.Error("Can't rollback on secondary")
3532
        cfg.SetDiskID(new_drbd, instance.primary_node)
3533
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3534
          logger.Error("Can't rollback on primary")
3535
        raise errors.OpExecError("Full abort, cleanup manually!!")
3536

    
3537
      dev.children.append(new_drbd)
3538
      cfg.AddInstance(instance)
3539

    
3540
    # this can fail as the old devices are degraded and _WaitForSync
3541
    # does a combined result over all disks, so we don't check its
3542
    # return value
3543
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3544

    
3545
    # so check manually all the devices
3546
    for name in iv_names:
3547
      dev, child, new_drbd = iv_names[name]
3548
      cfg.SetDiskID(dev, instance.primary_node)
3549
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3550
      if is_degr:
3551
        raise errors.OpExecError("MD device %s is degraded!" % name)
3552
      cfg.SetDiskID(new_drbd, instance.primary_node)
3553
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3554
      if is_degr:
3555
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3556

    
3557
    for name in iv_names:
3558
      dev, child, new_drbd = iv_names[name]
3559
      logger.Info("remove mirror %s component" % name)
3560
      cfg.SetDiskID(dev, instance.primary_node)
3561
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3562
                                              dev, [child]):
3563
        logger.Error("Can't remove child from mirror, aborting"
3564
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3565
        continue
3566

    
3567
      for node in child.logical_id[:2]:
3568
        logger.Info("remove child device on %s" % node)
3569
        cfg.SetDiskID(child, node)
3570
        if not rpc.call_blockdev_remove(node, child):
3571
          logger.Error("Warning: failed to remove device from node %s,"
3572
                       " continuing operation." % node)
3573

    
3574
      dev.children.remove(child)
3575

    
3576
      cfg.AddInstance(instance)
3577

    
3578
  def _ExecD8DiskOnly(self, feedback_fn):
3579
    """Replace a disk on the primary or secondary for dbrd8.
3580

3581
    The algorithm for replace is quite complicated:
3582
      - for each disk to be replaced:
3583
        - create new LVs on the target node with unique names
3584
        - detach old LVs from the drbd device
3585
        - rename old LVs to name_replaced.<time_t>
3586
        - rename new LVs to old LVs
3587
        - attach the new LVs (with the old names now) to the drbd device
3588
      - wait for sync across all devices
3589
      - for each modified disk:
3590
        - remove old LVs (which have the name name_replaces.<time_t>)
3591

3592
    Failures are not very well handled.
3593

3594
    """
3595
    steps_total = 6
3596
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3597
    instance = self.instance
3598
    iv_names = {}
3599
    vgname = self.cfg.GetVGName()
3600
    # start of work
3601
    cfg = self.cfg
3602
    tgt_node = self.tgt_node
3603
    oth_node = self.oth_node
3604

    
3605
    # Step: check device activation
3606
    self.proc.LogStep(1, steps_total, "check device existence")
3607
    info("checking volume groups")
3608
    my_vg = cfg.GetVGName()
3609
    results = rpc.call_vg_list([oth_node, tgt_node])
3610
    if not results:
3611
      raise errors.OpExecError("Can't list volume groups on the nodes")
3612
    for node in oth_node, tgt_node:
3613
      res = results.get(node, False)
3614
      if not res or my_vg not in res:
3615
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3616
                                 (my_vg, node))
3617
    for dev in instance.disks:
3618
      if not dev.iv_name in self.op.disks:
3619
        continue
3620
      for node in tgt_node, oth_node:
3621
        info("checking %s on %s" % (dev.iv_name, node))
3622
        cfg.SetDiskID(dev, node)
3623
        if not rpc.call_blockdev_find(node, dev):
3624
          raise errors.OpExecError("Can't find device %s on node %s" %
3625
                                   (dev.iv_name, node))
3626

    
3627
    # Step: check other node consistency
3628
    self.proc.LogStep(2, steps_total, "check peer consistency")
3629
    for dev in instance.disks:
3630
      if not dev.iv_name in self.op.disks:
3631
        continue
3632
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3633
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3634
                                   oth_node==instance.primary_node):
3635
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3636
                                 " to replace disks on this node (%s)" %
3637
                                 (oth_node, tgt_node))
3638

    
3639
    # Step: create new storage
3640
    self.proc.LogStep(3, steps_total, "allocate new storage")
3641
    for dev in instance.disks:
3642
      if not dev.iv_name in self.op.disks:
3643
        continue
3644
      size = dev.size
3645
      cfg.SetDiskID(dev, tgt_node)
3646
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3647
      names = _GenerateUniqueNames(cfg, lv_names)
3648
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3649
                             logical_id=(vgname, names[0]))
3650
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3651
                             logical_id=(vgname, names[1]))
3652
      new_lvs = [lv_data, lv_meta]
3653
      old_lvs = dev.children
3654
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3655
      info("creating new local storage on %s for %s" %
3656
           (tgt_node, dev.iv_name))
3657
      # since we *always* want to create this LV, we use the
3658
      # _Create...OnPrimary (which forces the creation), even if we
3659
      # are talking about the secondary node
3660
      for new_lv in new_lvs:
3661
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3662
                                        _GetInstanceInfoText(instance)):
3663
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3664
                                   " node '%s'" %
3665
                                   (new_lv.logical_id[1], tgt_node))
3666

    
3667
    # Step: for each lv, detach+rename*2+attach
3668
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3669
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3670
      info("detaching %s drbd from local storage" % dev.iv_name)
3671
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3672
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3673
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3674
      #dev.children = []
3675
      #cfg.Update(instance)
3676

    
3677
      # ok, we created the new LVs, so now we know we have the needed
3678
      # storage; as such, we proceed on the target node to rename
3679
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3680
      # using the assumption than logical_id == physical_id (which in
3681
      # turn is the unique_id on that node)
3682

    
3683
      # FIXME(iustin): use a better name for the replaced LVs
3684
      temp_suffix = int(time.time())
3685
      ren_fn = lambda d, suff: (d.physical_id[0],
3686
                                d.physical_id[1] + "_replaced-%s" % suff)
3687
      # build the rename list based on what LVs exist on the node
3688
      rlist = []
3689
      for to_ren in old_lvs:
3690
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3691
        if find_res is not None: # device exists
3692
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3693

    
3694
      info("renaming the old LVs on the target node")
3695
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3696
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3697
      # now we rename the new LVs to the old LVs
3698
      info("renaming the new LVs on the target node")
3699
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3700
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3701
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3702

    
3703
      for old, new in zip(old_lvs, new_lvs):
3704
        new.logical_id = old.logical_id
3705
        cfg.SetDiskID(new, tgt_node)
3706

    
3707
      for disk in old_lvs:
3708
        disk.logical_id = ren_fn(disk, temp_suffix)
3709
        cfg.SetDiskID(disk, tgt_node)
3710

    
3711
      # now that the new lvs have the old name, we can add them to the device
3712
      info("adding new mirror component on %s" % tgt_node)
3713
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3714
        for new_lv in new_lvs:
3715
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3716
            warning("Can't rollback device %s", hint="manually cleanup unused"
3717
                    " logical volumes")
3718
        raise errors.OpExecError("Can't add local storage to drbd")
3719

    
3720
      dev.children = new_lvs
3721
      cfg.Update(instance)
3722

    
3723
    # Step: wait for sync
3724

    
3725
    # this can fail as the old devices are degraded and _WaitForSync
3726
    # does a combined result over all disks, so we don't check its
3727
    # return value
3728
    self.proc.LogStep(5, steps_total, "sync devices")
3729
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3730

    
3731
    # so check manually all the devices
3732
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3733
      cfg.SetDiskID(dev, instance.primary_node)
3734
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3735
      if is_degr:
3736
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3737

    
3738
    # Step: remove old storage
3739
    self.proc.LogStep(6, steps_total, "removing old storage")
3740
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3741
      info("remove logical volumes for %s" % name)
3742
      for lv in old_lvs:
3743
        cfg.SetDiskID(lv, tgt_node)
3744
        if not rpc.call_blockdev_remove(tgt_node, lv):
3745
          warning("Can't remove old LV", hint="manually remove unused LVs")
3746
          continue
3747

    
3748
  def _ExecD8Secondary(self, feedback_fn):
3749
    """Replace the secondary node for drbd8.
3750

3751
    The algorithm for replace is quite complicated:
3752
      - for all disks of the instance:
3753
        - create new LVs on the new node with same names
3754
        - shutdown the drbd device on the old secondary
3755
        - disconnect the drbd network on the primary
3756
        - create the drbd device on the new secondary
3757
        - network attach the drbd on the primary, using an artifice:
3758
          the drbd code for Attach() will connect to the network if it
3759
          finds a device which is connected to the good local disks but
3760
          not network enabled
3761
      - wait for sync across all devices
3762
      - remove all disks from the old secondary
3763

3764
    Failures are not very well handled.
3765

3766
    """
3767
    steps_total = 6
3768
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3769
    instance = self.instance
3770
    iv_names = {}
3771
    vgname = self.cfg.GetVGName()
3772
    # start of work
3773
    cfg = self.cfg
3774
    old_node = self.tgt_node
3775
    new_node = self.new_node
3776
    pri_node = instance.primary_node
3777

    
3778
    # Step: check device activation
3779
    self.proc.LogStep(1, steps_total, "check device existence")
3780
    info("checking volume groups")
3781
    my_vg = cfg.GetVGName()
3782
    results = rpc.call_vg_list([pri_node, new_node])
3783
    if not results:
3784
      raise errors.OpExecError("Can't list volume groups on the nodes")
3785
    for node in pri_node, new_node:
3786
      res = results.get(node, False)
3787
      if not res or my_vg not in res:
3788
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3789
                                 (my_vg, node))
3790
    for dev in instance.disks:
3791
      if not dev.iv_name in self.op.disks:
3792
        continue
3793
      info("checking %s on %s" % (dev.iv_name, pri_node))
3794
      cfg.SetDiskID(dev, pri_node)
3795
      if not rpc.call_blockdev_find(pri_node, dev):
3796
        raise errors.OpExecError("Can't find device %s on node %s" %
3797
                                 (dev.iv_name, pri_node))
3798

    
3799
    # Step: check other node consistency
3800
    self.proc.LogStep(2, steps_total, "check peer consistency")
3801
    for dev in instance.disks:
3802
      if not dev.iv_name in self.op.disks:
3803
        continue
3804
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3805
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3806
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3807
                                 " unsafe to replace the secondary" %
3808
                                 pri_node)
3809

    
3810
    # Step: create new storage
3811
    self.proc.LogStep(3, steps_total, "allocate new storage")
3812
    for dev in instance.disks:
3813
      size = dev.size
3814
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3815
      # since we *always* want to create this LV, we use the
3816
      # _Create...OnPrimary (which forces the creation), even if we
3817
      # are talking about the secondary node
3818
      for new_lv in dev.children:
3819
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3820
                                        _GetInstanceInfoText(instance)):
3821
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3822
                                   " node '%s'" %
3823
                                   (new_lv.logical_id[1], new_node))
3824

    
3825
      iv_names[dev.iv_name] = (dev, dev.children)
3826

    
3827
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3828
    for dev in instance.disks:
3829
      size = dev.size
3830
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3831
      # create new devices on new_node
3832
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3833
                              logical_id=(pri_node, new_node,
3834
                                          dev.logical_id[2]),
3835
                              children=dev.children)
3836
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3837
                                        new_drbd, False,
3838
                                      _GetInstanceInfoText(instance)):
3839
        raise errors.OpExecError("Failed to create new DRBD on"
3840
                                 " node '%s'" % new_node)
3841

    
3842
    for dev in instance.disks:
3843
      # we have new devices, shutdown the drbd on the old secondary
3844
      info("shutting down drbd for %s on old node" % dev.iv_name)
3845
      cfg.SetDiskID(dev, old_node)
3846
      if not rpc.call_blockdev_shutdown(old_node, dev):
3847
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3848
                hint="Please cleanup this device manually as soon as possible")
3849

    
3850
    info("detaching primary drbds from the network (=> standalone)")
3851
    done = 0
3852
    for dev in instance.disks:
3853
      cfg.SetDiskID(dev, pri_node)
3854
      # set the physical (unique in bdev terms) id to None, meaning
3855
      # detach from network
3856
      dev.physical_id = (None,) * len(dev.physical_id)
3857
      # and 'find' the device, which will 'fix' it to match the
3858
      # standalone state
3859
      if rpc.call_blockdev_find(pri_node, dev):
3860
        done += 1
3861
      else:
3862
        warning("Failed to detach drbd %s from network, unusual case" %
3863
                dev.iv_name)
3864

    
3865
    if not done:
3866
      # no detaches succeeded (very unlikely)
3867
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3868

    
3869
    # if we managed to detach at least one, we update all the disks of
3870
    # the instance to point to the new secondary
3871
    info("updating instance configuration")
3872
    for dev in instance.disks:
3873
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3874
      cfg.SetDiskID(dev, pri_node)
3875
    cfg.Update(instance)
3876

    
3877
    # and now perform the drbd attach
3878
    info("attaching primary drbds to new secondary (standalone => connected)")
3879
    failures = []
3880
    for dev in instance.disks:
3881
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3882
      # since the attach is smart, it's enough to 'find' the device,
3883
      # it will automatically activate the network, if the physical_id
3884
      # is correct
3885
      cfg.SetDiskID(dev, pri_node)
3886
      if not rpc.call_blockdev_find(pri_node, dev):
3887
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3888
                "please do a gnt-instance info to see the status of disks")
3889

    
3890
    # this can fail as the old devices are degraded and _WaitForSync
3891
    # does a combined result over all disks, so we don't check its
3892
    # return value
3893
    self.proc.LogStep(5, steps_total, "sync devices")
3894
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3895

    
3896
    # so check manually all the devices
3897
    for name, (dev, old_lvs) in iv_names.iteritems():
3898
      cfg.SetDiskID(dev, pri_node)
3899
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3900
      if is_degr:
3901
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3902

    
3903
    self.proc.LogStep(6, steps_total, "removing old storage")
3904
    for name, (dev, old_lvs) in iv_names.iteritems():
3905
      info("remove logical volumes for %s" % name)
3906
      for lv in old_lvs:
3907
        cfg.SetDiskID(lv, old_node)
3908
        if not rpc.call_blockdev_remove(old_node, lv):
3909
          warning("Can't remove LV on old secondary",
3910
                  hint="Cleanup stale volumes by hand")
3911

    
3912
  def Exec(self, feedback_fn):
3913
    """Execute disk replacement.
3914

3915
    This dispatches the disk replacement to the appropriate handler.
3916

3917
    """
3918
    instance = self.instance
3919
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3920
      fn = self._ExecRR1
3921
    elif instance.disk_template == constants.DT_DRBD8:
3922
      if self.op.remote_node is None:
3923
        fn = self._ExecD8DiskOnly
3924
      else:
3925
        fn = self._ExecD8Secondary
3926
    else:
3927
      raise errors.ProgrammerError("Unhandled disk replacement case")
3928
    return fn(feedback_fn)
3929

    
3930

    
3931
class LUQueryInstanceData(NoHooksLU):
3932
  """Query runtime instance data.
3933

3934
  """
3935
  _OP_REQP = ["instances"]
3936

    
3937
  def CheckPrereq(self):
3938
    """Check prerequisites.
3939

3940
    This only checks the optional instance list against the existing names.
3941

3942
    """
3943
    if not isinstance(self.op.instances, list):
3944
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3945
    if self.op.instances:
3946
      self.wanted_instances = []
3947
      names = self.op.instances
3948
      for name in names:
3949
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3950
        if instance is None:
3951
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3952
      self.wanted_instances.append(instance)
3953
    else:
3954
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3955
                               in self.cfg.GetInstanceList()]
3956
    return
3957

    
3958

    
3959
  def _ComputeDiskStatus(self, instance, snode, dev):
3960
    """Compute block device status.
3961

3962
    """
3963
    self.cfg.SetDiskID(dev, instance.primary_node)
3964
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3965
    if dev.dev_type in constants.LDS_DRBD:
3966
      # we change the snode then (otherwise we use the one passed in)
3967
      if dev.logical_id[0] == instance.primary_node:
3968
        snode = dev.logical_id[1]
3969
      else:
3970
        snode = dev.logical_id[0]
3971

    
3972
    if snode:
3973
      self.cfg.SetDiskID(dev, snode)
3974
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3975
    else:
3976
      dev_sstatus = None
3977

    
3978
    if dev.children:
3979
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3980
                      for child in dev.children]
3981
    else:
3982
      dev_children = []
3983

    
3984
    data = {
3985
      "iv_name": dev.iv_name,
3986
      "dev_type": dev.dev_type,
3987
      "logical_id": dev.logical_id,
3988
      "physical_id": dev.physical_id,
3989
      "pstatus": dev_pstatus,
3990
      "sstatus": dev_sstatus,
3991
      "children": dev_children,
3992
      }
3993

    
3994
    return data
3995

    
3996
  def Exec(self, feedback_fn):
3997
    """Gather and return data"""
3998
    result = {}
3999
    for instance in self.wanted_instances:
4000
      remote_info = rpc.call_instance_info(instance.primary_node,
4001
                                                instance.name)
4002
      if remote_info and "state" in remote_info:
4003
        remote_state = "up"
4004
      else:
4005
        remote_state = "down"
4006
      if instance.status == "down":
4007
        config_state = "down"
4008
      else:
4009
        config_state = "up"
4010

    
4011
      disks = [self._ComputeDiskStatus(instance, None, device)
4012
               for device in instance.disks]
4013

    
4014
      idict = {
4015
        "name": instance.name,
4016
        "config_state": config_state,
4017
        "run_state": remote_state,
4018
        "pnode": instance.primary_node,
4019
        "snodes": instance.secondary_nodes,
4020
        "os": instance.os,
4021
        "memory": instance.memory,
4022
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4023
        "disks": disks,
4024
        "network_port": instance.network_port,
4025
        "vcpus": instance.vcpus,
4026
        }
4027

    
4028
      result[instance.name] = idict
4029

    
4030
    return result
4031

    
4032

    
4033
class LUSetInstanceParms(LogicalUnit):
4034
  """Modifies an instances's parameters.
4035

4036
  """
4037
  HPATH = "instance-modify"
4038
  HTYPE = constants.HTYPE_INSTANCE
4039
  _OP_REQP = ["instance_name"]
4040

    
4041
  def BuildHooksEnv(self):
4042
    """Build hooks env.
4043

4044
    This runs on the master, primary and secondaries.
4045

4046
    """
4047
    args = dict()
4048
    if self.mem:
4049
      args['memory'] = self.mem
4050
    if self.vcpus:
4051
      args['vcpus'] = self.vcpus
4052
    if self.do_ip or self.do_bridge:
4053
      if self.do_ip:
4054
        ip = self.ip
4055
      else:
4056
        ip = self.instance.nics[0].ip
4057
      if self.bridge:
4058
        bridge = self.bridge
4059
      else:
4060
        bridge = self.instance.nics[0].bridge
4061
      args['nics'] = [(ip, bridge)]
4062
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4063
    nl = [self.sstore.GetMasterNode(),
4064
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4065
    return env, nl, nl
4066

    
4067
  def CheckPrereq(self):
4068
    """Check prerequisites.
4069

4070
    This only checks the instance list against the existing names.
4071

4072
    """
4073
    self.mem = getattr(self.op, "mem", None)
4074
    self.vcpus = getattr(self.op, "vcpus", None)
4075
    self.ip = getattr(self.op, "ip", None)
4076
    self.bridge = getattr(self.op, "bridge", None)
4077
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4078
      raise errors.OpPrereqError("No changes submitted")
4079
    if self.mem is not None:
4080
      try:
4081
        self.mem = int(self.mem)
4082
      except ValueError, err:
4083
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4084
    if self.vcpus is not None:
4085
      try:
4086
        self.vcpus = int(self.vcpus)
4087
      except ValueError, err:
4088
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4089
    if self.ip is not None:
4090
      self.do_ip = True
4091
      if self.ip.lower() == "none":
4092
        self.ip = None
4093
      else:
4094
        if not utils.IsValidIP(self.ip):
4095
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4096
    else:
4097
      self.do_ip = False
4098
    self.do_bridge = (self.bridge is not None)
4099

    
4100
    instance = self.cfg.GetInstanceInfo(
4101
      self.cfg.ExpandInstanceName(self.op.instance_name))
4102
    if instance is None:
4103
      raise errors.OpPrereqError("No such instance name '%s'" %
4104
                                 self.op.instance_name)
4105
    self.op.instance_name = instance.name
4106
    self.instance = instance
4107
    return
4108

    
4109
  def Exec(self, feedback_fn):
4110
    """Modifies an instance.
4111

4112
    All parameters take effect only at the next restart of the instance.
4113
    """
4114
    result = []
4115
    instance = self.instance
4116
    if self.mem:
4117
      instance.memory = self.mem
4118
      result.append(("mem", self.mem))
4119
    if self.vcpus:
4120
      instance.vcpus = self.vcpus
4121
      result.append(("vcpus",  self.vcpus))
4122
    if self.do_ip:
4123
      instance.nics[0].ip = self.ip
4124
      result.append(("ip", self.ip))
4125
    if self.bridge:
4126
      instance.nics[0].bridge = self.bridge
4127
      result.append(("bridge", self.bridge))
4128

    
4129
    self.cfg.AddInstance(instance)
4130

    
4131
    return result
4132

    
4133

    
4134
class LUQueryExports(NoHooksLU):
4135
  """Query the exports list
4136

4137
  """
4138
  _OP_REQP = []
4139

    
4140
  def CheckPrereq(self):
4141
    """Check that the nodelist contains only existing nodes.
4142

4143
    """
4144
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4145

    
4146
  def Exec(self, feedback_fn):
4147
    """Compute the list of all the exported system images.
4148

4149
    Returns:
4150
      a dictionary with the structure node->(export-list)
4151
      where export-list is a list of the instances exported on
4152
      that node.
4153

4154
    """
4155
    return rpc.call_export_list(self.nodes)
4156

    
4157

    
4158
class LUExportInstance(LogicalUnit):
4159
  """Export an instance to an image in the cluster.
4160

4161
  """
4162
  HPATH = "instance-export"
4163
  HTYPE = constants.HTYPE_INSTANCE
4164
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4165

    
4166
  def BuildHooksEnv(self):
4167
    """Build hooks env.
4168

4169
    This will run on the master, primary node and target node.
4170

4171
    """
4172
    env = {
4173
      "EXPORT_NODE": self.op.target_node,
4174
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4175
      }
4176
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4177
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4178
          self.op.target_node]
4179
    return env, nl, nl
4180

    
4181
  def CheckPrereq(self):
4182
    """Check prerequisites.
4183

4184
    This checks that the instance name is a valid one.
4185

4186
    """
4187
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4188
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4189
    if self.instance is None:
4190
      raise errors.OpPrereqError("Instance '%s' not found" %
4191
                                 self.op.instance_name)
4192

    
4193
    # node verification
4194
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4195
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4196

    
4197
    if self.dst_node is None:
4198
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4199
                                 self.op.target_node)
4200
    self.op.target_node = self.dst_node.name
4201

    
4202
  def Exec(self, feedback_fn):
4203
    """Export an instance to an image in the cluster.
4204

4205
    """
4206
    instance = self.instance
4207
    dst_node = self.dst_node
4208
    src_node = instance.primary_node
4209
    # shutdown the instance, unless requested not to do so
4210
    if self.op.shutdown:
4211
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4212
      self.proc.ChainOpCode(op)
4213

    
4214
    vgname = self.cfg.GetVGName()
4215

    
4216
    snap_disks = []
4217

    
4218
    try:
4219
      for disk in instance.disks:
4220
        if disk.iv_name == "sda":
4221
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4222
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4223

    
4224
          if not new_dev_name:
4225
            logger.Error("could not snapshot block device %s on node %s" %
4226
                         (disk.logical_id[1], src_node))
4227
          else:
4228
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4229
                                      logical_id=(vgname, new_dev_name),
4230
                                      physical_id=(vgname, new_dev_name),
4231
                                      iv_name=disk.iv_name)
4232
            snap_disks.append(new_dev)
4233

    
4234
    finally:
4235
      if self.op.shutdown:
4236
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4237
                                       force=False)
4238
        self.proc.ChainOpCode(op)
4239

    
4240
    # TODO: check for size
4241

    
4242
    for dev in snap_disks:
4243
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4244
                                           instance):
4245
        logger.Error("could not export block device %s from node"
4246
                     " %s to node %s" %
4247
                     (dev.logical_id[1], src_node, dst_node.name))
4248
      if not rpc.call_blockdev_remove(src_node, dev):
4249
        logger.Error("could not remove snapshot block device %s from"
4250
                     " node %s" % (dev.logical_id[1], src_node))
4251

    
4252
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4253
      logger.Error("could not finalize export for instance %s on node %s" %
4254
                   (instance.name, dst_node.name))
4255

    
4256
    nodelist = self.cfg.GetNodeList()
4257
    nodelist.remove(dst_node.name)
4258

    
4259
    # on one-node clusters nodelist will be empty after the removal
4260
    # if we proceed the backup would be removed because OpQueryExports
4261
    # substitutes an empty list with the full cluster node list.
4262
    if nodelist:
4263
      op = opcodes.OpQueryExports(nodes=nodelist)
4264
      exportlist = self.proc.ChainOpCode(op)
4265
      for node in exportlist:
4266
        if instance.name in exportlist[node]:
4267
          if not rpc.call_export_remove(node, instance.name):
4268
            logger.Error("could not remove older export for instance %s"
4269
                         " on node %s" % (instance.name, node))
4270

    
4271

    
4272
class TagsLU(NoHooksLU):
4273
  """Generic tags LU.
4274

4275
  This is an abstract class which is the parent of all the other tags LUs.
4276

4277
  """
4278
  def CheckPrereq(self):
4279
    """Check prerequisites.
4280

4281
    """
4282
    if self.op.kind == constants.TAG_CLUSTER:
4283
      self.target = self.cfg.GetClusterInfo()
4284
    elif self.op.kind == constants.TAG_NODE:
4285
      name = self.cfg.ExpandNodeName(self.op.name)
4286
      if name is None:
4287
        raise errors.OpPrereqError("Invalid node name (%s)" %
4288
                                   (self.op.name,))
4289
      self.op.name = name
4290
      self.target = self.cfg.GetNodeInfo(name)
4291
    elif self.op.kind == constants.TAG_INSTANCE:
4292
      name = self.cfg.ExpandInstanceName(self.op.name)
4293
      if name is None:
4294
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4295
                                   (self.op.name,))
4296
      self.op.name = name
4297
      self.target = self.cfg.GetInstanceInfo(name)
4298
    else:
4299
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4300
                                 str(self.op.kind))
4301

    
4302

    
4303
class LUGetTags(TagsLU):
4304
  """Returns the tags of a given object.
4305

4306
  """
4307
  _OP_REQP = ["kind", "name"]
4308

    
4309
  def Exec(self, feedback_fn):
4310
    """Returns the tag list.
4311

4312
    """
4313
    return self.target.GetTags()
4314

    
4315

    
4316
class LUSearchTags(NoHooksLU):
4317
  """Searches the tags for a given pattern.
4318

4319
  """
4320
  _OP_REQP = ["pattern"]
4321

    
4322
  def CheckPrereq(self):
4323
    """Check prerequisites.
4324

4325
    This checks the pattern passed for validity by compiling it.
4326

4327
    """
4328
    try:
4329
      self.re = re.compile(self.op.pattern)
4330
    except re.error, err:
4331
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4332
                                 (self.op.pattern, err))
4333

    
4334
  def Exec(self, feedback_fn):
4335
    """Returns the tag list.
4336

4337
    """
4338
    cfg = self.cfg
4339
    tgts = [("/cluster", cfg.GetClusterInfo())]
4340
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4341
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4342
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4343
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4344
    results = []
4345
    for path, target in tgts:
4346
      for tag in target.GetTags():
4347
        if self.re.search(tag):
4348
          results.append((path, tag))
4349
    return results
4350

    
4351

    
4352
class LUAddTags(TagsLU):
4353
  """Sets a tag on a given object.
4354

4355
  """
4356
  _OP_REQP = ["kind", "name", "tags"]
4357

    
4358
  def CheckPrereq(self):
4359
    """Check prerequisites.
4360

4361
    This checks the type and length of the tag name and value.
4362

4363
    """
4364
    TagsLU.CheckPrereq(self)
4365
    for tag in self.op.tags:
4366
      objects.TaggableObject.ValidateTag(tag)
4367

    
4368
  def Exec(self, feedback_fn):
4369
    """Sets the tag.
4370

4371
    """
4372
    try:
4373
      for tag in self.op.tags:
4374
        self.target.AddTag(tag)
4375
    except errors.TagError, err:
4376
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4377
    try:
4378
      self.cfg.Update(self.target)
4379
    except errors.ConfigurationError:
4380
      raise errors.OpRetryError("There has been a modification to the"
4381
                                " config file and the operation has been"
4382
                                " aborted. Please retry.")
4383

    
4384

    
4385
class LUDelTags(TagsLU):
4386
  """Delete a list of tags from a given object.
4387

4388
  """
4389
  _OP_REQP = ["kind", "name", "tags"]
4390

    
4391
  def CheckPrereq(self):
4392
    """Check prerequisites.
4393

4394
    This checks that we have the given tag.
4395

4396
    """
4397
    TagsLU.CheckPrereq(self)
4398
    for tag in self.op.tags:
4399
      objects.TaggableObject.ValidateTag(tag)
4400
    del_tags = frozenset(self.op.tags)
4401
    cur_tags = self.target.GetTags()
4402
    if not del_tags <= cur_tags:
4403
      diff_tags = del_tags - cur_tags
4404
      diff_names = ["'%s'" % tag for tag in diff_tags]
4405
      diff_names.sort()
4406
      raise errors.OpPrereqError("Tag(s) %s not found" %
4407
                                 (",".join(diff_names)))
4408

    
4409
  def Exec(self, feedback_fn):
4410
    """Remove the tag from the object.
4411

4412
    """
4413
    for tag in self.op.tags:
4414
      self.target.RemoveTag(tag)
4415
    try:
4416
      self.cfg.Update(self.target)
4417
    except errors.ConfigurationError:
4418
      raise errors.OpRetryError("There has been a modification to the"
4419
                                " config file and the operation has been"
4420
                                " aborted. Please retry.")