Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 71aa8f73

History | View | Annotate | Download (149.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
504
        raise errors.OpPrereqError("Please prepare the cluster VNC"
505
                                   "password file %s" %
506
                                   constants.VNC_PASSWORD_FILE)
507

    
508
    self.hostname = hostname = utils.HostInfo()
509

    
510
    if hostname.ip.startswith("127."):
511
      raise errors.OpPrereqError("This host's IP resolves to the private"
512
                                 " range (%s). Please fix DNS or /etc/hosts." %
513
                                 (hostname.ip,))
514

    
515
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516

    
517
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518
                         constants.DEFAULT_NODED_PORT):
519
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520
                                 " to %s,\nbut this ip address does not"
521
                                 " belong to this host."
522
                                 " Aborting." % hostname.ip)
523

    
524
    secondary_ip = getattr(self.op, "secondary_ip", None)
525
    if secondary_ip and not utils.IsValidIP(secondary_ip):
526
      raise errors.OpPrereqError("Invalid secondary ip given")
527
    if (secondary_ip and
528
        secondary_ip != hostname.ip and
529
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530
                           constants.DEFAULT_NODED_PORT))):
531
      raise errors.OpPrereqError("You gave %s as secondary IP,"
532
                                 " but it does not belong to this host." %
533
                                 secondary_ip)
534
    self.secondary_ip = secondary_ip
535

    
536
    # checks presence of the volume group given
537
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538

    
539
    if vgstatus:
540
      raise errors.OpPrereqError("Error: %s" % vgstatus)
541

    
542
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543
                    self.op.mac_prefix):
544
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545
                                 self.op.mac_prefix)
546

    
547
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
548
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549
                                 self.op.hypervisor_type)
550

    
551
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552
    if result.failed:
553
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554
                                 (self.op.master_netdev,
555
                                  result.output.strip()))
556

    
557
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
560
                                 " executable." % constants.NODE_INITD_SCRIPT)
561

    
562
  def Exec(self, feedback_fn):
563
    """Initialize the cluster.
564

565
    """
566
    clustername = self.clustername
567
    hostname = self.hostname
568

    
569
    # set up the simple store
570
    self.sstore = ss = ssconf.SimpleStore()
571
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576

    
577
    # set up the inter-node password and certificate
578
    _InitGanetiServerSetup(ss)
579

    
580
    # start the master ip
581
    rpc.call_node_start_master(hostname.name)
582

    
583
    # set up ssh config and /etc/hosts
584
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
585
    try:
586
      sshline = f.read()
587
    finally:
588
      f.close()
589
    sshkey = sshline.split(" ")[1]
590

    
591
    _AddHostToEtcHosts(hostname.name)
592

    
593
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594

    
595
    _InitSSHSetup(hostname.name)
596

    
597
    # init of cluster config file
598
    self.cfg = cfgw = config.ConfigWriter()
599
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600
                    sshkey, self.op.mac_prefix,
601
                    self.op.vg_name, self.op.def_bridge)
602

    
603

    
604
class LUDestroyCluster(NoHooksLU):
605
  """Logical unit for destroying the cluster.
606

607
  """
608
  _OP_REQP = []
609

    
610
  def CheckPrereq(self):
611
    """Check prerequisites.
612

613
    This checks whether the cluster is empty.
614

615
    Any errors are signalled by raising errors.OpPrereqError.
616

617
    """
618
    master = self.sstore.GetMasterNode()
619

    
620
    nodelist = self.cfg.GetNodeList()
621
    if len(nodelist) != 1 or nodelist[0] != master:
622
      raise errors.OpPrereqError("There are still %d node(s) in"
623
                                 " this cluster." % (len(nodelist) - 1))
624
    instancelist = self.cfg.GetInstanceList()
625
    if instancelist:
626
      raise errors.OpPrereqError("There are still %d instance(s) in"
627
                                 " this cluster." % len(instancelist))
628

    
629
  def Exec(self, feedback_fn):
630
    """Destroys the cluster.
631

632
    """
633
    master = self.sstore.GetMasterNode()
634
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635
    utils.CreateBackup(priv_key)
636
    utils.CreateBackup(pub_key)
637
    rpc.call_node_leave_cluster(master)
638

    
639

    
640
class LUVerifyCluster(NoHooksLU):
641
  """Verifies the cluster status.
642

643
  """
644
  _OP_REQP = []
645

    
646
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647
                  remote_version, feedback_fn):
648
    """Run multiple tests against a node.
649

650
    Test list:
651
      - compares ganeti version
652
      - checks vg existance and size > 20G
653
      - checks config file checksum
654
      - checks ssh to other nodes
655

656
    Args:
657
      node: name of the node to check
658
      file_list: required list of files
659
      local_cksum: dictionary of local files and their checksums
660

661
    """
662
    # compares ganeti version
663
    local_version = constants.PROTOCOL_VERSION
664
    if not remote_version:
665
      feedback_fn(" - ERROR: connection to %s failed" % (node))
666
      return True
667

    
668
    if local_version != remote_version:
669
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670
                      (local_version, node, remote_version))
671
      return True
672

    
673
    # checks vg existance and size > 20G
674

    
675
    bad = False
676
    if not vglist:
677
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678
                      (node,))
679
      bad = True
680
    else:
681
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682
      if vgstatus:
683
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684
        bad = True
685

    
686
    # checks config file checksum
687
    # checks ssh to any
688

    
689
    if 'filelist' not in node_result:
690
      bad = True
691
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
692
    else:
693
      remote_cksum = node_result['filelist']
694
      for file_name in file_list:
695
        if file_name not in remote_cksum:
696
          bad = True
697
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
698
        elif remote_cksum[file_name] != local_cksum[file_name]:
699
          bad = True
700
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701

    
702
    if 'nodelist' not in node_result:
703
      bad = True
704
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705
    else:
706
      if node_result['nodelist']:
707
        bad = True
708
        for node in node_result['nodelist']:
709
          feedback_fn("  - ERROR: communication with node '%s': %s" %
710
                          (node, node_result['nodelist'][node]))
711
    hyp_result = node_result.get('hypervisor', None)
712
    if hyp_result is not None:
713
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714
    return bad
715

    
716
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717
    """Verify an instance.
718

719
    This function checks to see if the required block devices are
720
    available on the instance's node.
721

722
    """
723
    bad = False
724

    
725
    instancelist = self.cfg.GetInstanceList()
726
    if not instance in instancelist:
727
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
728
                      (instance, instancelist))
729
      bad = True
730

    
731
    instanceconfig = self.cfg.GetInstanceInfo(instance)
732
    node_current = instanceconfig.primary_node
733

    
734
    node_vol_should = {}
735
    instanceconfig.MapLVsByNode(node_vol_should)
736

    
737
    for node in node_vol_should:
738
      for volume in node_vol_should[node]:
739
        if node not in node_vol_is or volume not in node_vol_is[node]:
740
          feedback_fn("  - ERROR: volume %s missing on node %s" %
741
                          (volume, node))
742
          bad = True
743

    
744
    if not instanceconfig.status == 'down':
745
      if not instance in node_instance[node_current]:
746
        feedback_fn("  - ERROR: instance %s not running on node %s" %
747
                        (instance, node_current))
748
        bad = True
749

    
750
    for node in node_instance:
751
      if (not node == node_current):
752
        if instance in node_instance[node]:
753
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
754
                          (instance, node))
755
          bad = True
756

    
757
    return bad
758

    
759
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760
    """Verify if there are any unknown volumes in the cluster.
761

762
    The .os, .swap and backup volumes are ignored. All other volumes are
763
    reported as unknown.
764

765
    """
766
    bad = False
767

    
768
    for node in node_vol_is:
769
      for volume in node_vol_is[node]:
770
        if node not in node_vol_should or volume not in node_vol_should[node]:
771
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772
                      (volume, node))
773
          bad = True
774
    return bad
775

    
776
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777
    """Verify the list of running instances.
778

779
    This checks what instances are running but unknown to the cluster.
780

781
    """
782
    bad = False
783
    for node in node_instance:
784
      for runninginstance in node_instance[node]:
785
        if runninginstance not in instancelist:
786
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787
                          (runninginstance, node))
788
          bad = True
789
    return bad
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This has no prerequisites.
795

796
    """
797
    pass
798

    
799
  def Exec(self, feedback_fn):
800
    """Verify integrity of cluster, performing various test on nodes.
801

802
    """
803
    bad = False
804
    feedback_fn("* Verifying global settings")
805
    for msg in self.cfg.VerifyConfig():
806
      feedback_fn("  - ERROR: %s" % msg)
807

    
808
    vg_name = self.cfg.GetVGName()
809
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
810
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811
    node_volume = {}
812
    node_instance = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832

    
833
    for node in nodelist:
834
      feedback_fn("* Verifying node %s" % node)
835
      result = self._VerifyNode(node, file_names, local_checksums,
836
                                all_vglist[node], all_nvinfo[node],
837
                                all_rversion[node], feedback_fn)
838
      bad = bad or result
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if isinstance(volumeinfo, basestring):
844
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
845
                    (node, volumeinfo[-400:].encode('string_escape')))
846
        bad = True
847
        node_volume[node] = {}
848
      elif not isinstance(volumeinfo, dict):
849
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
850
        bad = True
851
        continue
852
      else:
853
        node_volume[node] = volumeinfo
854

    
855
      # node_instance
856
      nodeinstance = all_instanceinfo[node]
857
      if type(nodeinstance) != list:
858
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
859
        bad = True
860
        continue
861

    
862
      node_instance[node] = nodeinstance
863

    
864
    node_vol_should = {}
865

    
866
    for instance in instancelist:
867
      feedback_fn("* Verifying instance %s" % instance)
868
      result =  self._VerifyInstance(instance, node_volume, node_instance,
869
                                     feedback_fn)
870
      bad = bad or result
871

    
872
      inst_config = self.cfg.GetInstanceInfo(instance)
873

    
874
      inst_config.MapLVsByNode(node_vol_should)
875

    
876
    feedback_fn("* Verifying orphan volumes")
877
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
878
                                       feedback_fn)
879
    bad = bad or result
880

    
881
    feedback_fn("* Verifying remaining instances")
882
    result = self._VerifyOrphanInstances(instancelist, node_instance,
883
                                         feedback_fn)
884
    bad = bad or result
885

    
886
    return int(bad)
887

    
888

    
889
class LUVerifyDisks(NoHooksLU):
890
  """Verifies the cluster disks status.
891

892
  """
893
  _OP_REQP = []
894

    
895
  def CheckPrereq(self):
896
    """Check prerequisites.
897

898
    This has no prerequisites.
899

900
    """
901
    pass
902

    
903
  def Exec(self, feedback_fn):
904
    """Verify integrity of cluster disks.
905

906
    """
907
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
908

    
909
    vg_name = self.cfg.GetVGName()
910
    nodes = utils.NiceSort(self.cfg.GetNodeList())
911
    instances = [self.cfg.GetInstanceInfo(name)
912
                 for name in self.cfg.GetInstanceList()]
913

    
914
    nv_dict = {}
915
    for inst in instances:
916
      inst_lvs = {}
917
      if (inst.status != "up" or
918
          inst.disk_template not in constants.DTS_NET_MIRROR):
919
        continue
920
      inst.MapLVsByNode(inst_lvs)
921
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922
      for node, vol_list in inst_lvs.iteritems():
923
        for vol in vol_list:
924
          nv_dict[(node, vol)] = inst
925

    
926
    if not nv_dict:
927
      return result
928

    
929
    node_lvs = rpc.call_volume_list(nodes, vg_name)
930

    
931
    to_act = set()
932
    for node in nodes:
933
      # node_volume
934
      lvs = node_lvs[node]
935

    
936
      if isinstance(lvs, basestring):
937
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
938
        res_nlvm[node] = lvs
939
      elif not isinstance(lvs, dict):
940
        logger.Info("connection to node %s failed or invalid data returned" %
941
                    (node,))
942
        res_nodes.append(node)
943
        continue
944

    
945
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946
        inst = nv_dict.pop((node, lv_name), None)
947
        if (not lv_online and inst is not None
948
            and inst.name not in res_instances):
949
            res_instances.append(inst.name)
950

    
951
    # any leftover items in nv_dict are missing LVs, let's arrange the
952
    # data better
953
    for key, inst in nv_dict.iteritems():
954
      if inst.name not in res_missing:
955
        res_missing[inst.name] = []
956
      res_missing[inst.name].append(key)
957

    
958
    return result
959

    
960

    
961
class LURenameCluster(LogicalUnit):
962
  """Rename the cluster.
963

964
  """
965
  HPATH = "cluster-rename"
966
  HTYPE = constants.HTYPE_CLUSTER
967
  _OP_REQP = ["name"]
968

    
969
  def BuildHooksEnv(self):
970
    """Build hooks env.
971

972
    """
973
    env = {
974
      "OP_TARGET": self.op.sstore.GetClusterName(),
975
      "NEW_NAME": self.op.name,
976
      }
977
    mn = self.sstore.GetMasterNode()
978
    return env, [mn], [mn]
979

    
980
  def CheckPrereq(self):
981
    """Verify that the passed name is a valid one.
982

983
    """
984
    hostname = utils.HostInfo(self.op.name)
985

    
986
    new_name = hostname.name
987
    self.ip = new_ip = hostname.ip
988
    old_name = self.sstore.GetClusterName()
989
    old_ip = self.sstore.GetMasterIP()
990
    if new_name == old_name and new_ip == old_ip:
991
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
992
                                 " cluster has changed")
993
    if new_ip != old_ip:
994
      result = utils.RunCmd(["fping", "-q", new_ip])
995
      if not result.failed:
996
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
997
                                   " reachable on the network. Aborting." %
998
                                   new_ip)
999

    
1000
    self.op.name = new_name
1001

    
1002
  def Exec(self, feedback_fn):
1003
    """Rename the cluster.
1004

1005
    """
1006
    clustername = self.op.name
1007
    ip = self.ip
1008
    ss = self.sstore
1009

    
1010
    # shutdown the master IP
1011
    master = ss.GetMasterNode()
1012
    if not rpc.call_node_stop_master(master):
1013
      raise errors.OpExecError("Could not disable the master role")
1014

    
1015
    try:
1016
      # modify the sstore
1017
      ss.SetKey(ss.SS_MASTER_IP, ip)
1018
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1019

    
1020
      # Distribute updated ss config to all nodes
1021
      myself = self.cfg.GetNodeInfo(master)
1022
      dist_nodes = self.cfg.GetNodeList()
1023
      if myself.name in dist_nodes:
1024
        dist_nodes.remove(myself.name)
1025

    
1026
      logger.Debug("Copying updated ssconf data to all nodes")
1027
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028
        fname = ss.KeyToFilename(keyname)
1029
        result = rpc.call_upload_file(dist_nodes, fname)
1030
        for to_node in dist_nodes:
1031
          if not result[to_node]:
1032
            logger.Error("copy of file %s to node %s failed" %
1033
                         (fname, to_node))
1034
    finally:
1035
      if not rpc.call_node_start_master(master):
1036
        logger.Error("Could not re-enable the master role on the master,"
1037
                     " please restart manually.")
1038

    
1039

    
1040
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041
  """Sleep and poll for an instance's disk to sync.
1042

1043
  """
1044
  if not instance.disks:
1045
    return True
1046

    
1047
  if not oneshot:
1048
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1049

    
1050
  node = instance.primary_node
1051

    
1052
  for dev in instance.disks:
1053
    cfgw.SetDiskID(dev, node)
1054

    
1055
  retries = 0
1056
  while True:
1057
    max_time = 0
1058
    done = True
1059
    cumul_degraded = False
1060
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1061
    if not rstats:
1062
      proc.LogWarning("Can't get any data from node %s" % node)
1063
      retries += 1
1064
      if retries >= 10:
1065
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1066
                                 " aborting." % node)
1067
      time.sleep(6)
1068
      continue
1069
    retries = 0
1070
    for i in range(len(rstats)):
1071
      mstat = rstats[i]
1072
      if mstat is None:
1073
        proc.LogWarning("Can't compute data for node %s/%s" %
1074
                        (node, instance.disks[i].iv_name))
1075
        continue
1076
      # we ignore the ldisk parameter
1077
      perc_done, est_time, is_degraded, _ = mstat
1078
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079
      if perc_done is not None:
1080
        done = False
1081
        if est_time is not None:
1082
          rem_time = "%d estimated seconds remaining" % est_time
1083
          max_time = est_time
1084
        else:
1085
          rem_time = "no time estimate"
1086
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087
                     (instance.disks[i].iv_name, perc_done, rem_time))
1088
    if done or oneshot:
1089
      break
1090

    
1091
    if unlock:
1092
      utils.Unlock('cmd')
1093
    try:
1094
      time.sleep(min(60, max_time))
1095
    finally:
1096
      if unlock:
1097
        utils.Lock('cmd')
1098

    
1099
  if done:
1100
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101
  return not cumul_degraded
1102

    
1103

    
1104
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105
  """Check that mirrors are not degraded.
1106

1107
  The ldisk parameter, if True, will change the test from the
1108
  is_degraded attribute (which represents overall non-ok status for
1109
  the device(s)) to the ldisk (representing the local storage status).
1110

1111
  """
1112
  cfgw.SetDiskID(dev, node)
1113
  if ldisk:
1114
    idx = 6
1115
  else:
1116
    idx = 5
1117

    
1118
  result = True
1119
  if on_primary or dev.AssembleOnSecondary():
1120
    rstats = rpc.call_blockdev_find(node, dev)
1121
    if not rstats:
1122
      logger.ToStderr("Can't get any data from node %s" % node)
1123
      result = False
1124
    else:
1125
      result = result and (not rstats[idx])
1126
  if dev.children:
1127
    for child in dev.children:
1128
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1129

    
1130
  return result
1131

    
1132

    
1133
class LUDiagnoseOS(NoHooksLU):
1134
  """Logical unit for OS diagnose/query.
1135

1136
  """
1137
  _OP_REQP = []
1138

    
1139
  def CheckPrereq(self):
1140
    """Check prerequisites.
1141

1142
    This always succeeds, since this is a pure query LU.
1143

1144
    """
1145
    return
1146

    
1147
  def Exec(self, feedback_fn):
1148
    """Compute the list of OSes.
1149

1150
    """
1151
    node_list = self.cfg.GetNodeList()
1152
    node_data = rpc.call_os_diagnose(node_list)
1153
    if node_data == False:
1154
      raise errors.OpExecError("Can't gather the list of OSes")
1155
    return node_data
1156

    
1157

    
1158
class LURemoveNode(LogicalUnit):
1159
  """Logical unit for removing a node.
1160

1161
  """
1162
  HPATH = "node-remove"
1163
  HTYPE = constants.HTYPE_NODE
1164
  _OP_REQP = ["node_name"]
1165

    
1166
  def BuildHooksEnv(self):
1167
    """Build hooks env.
1168

1169
    This doesn't run on the target node in the pre phase as a failed
1170
    node would not allows itself to run.
1171

1172
    """
1173
    env = {
1174
      "OP_TARGET": self.op.node_name,
1175
      "NODE_NAME": self.op.node_name,
1176
      }
1177
    all_nodes = self.cfg.GetNodeList()
1178
    all_nodes.remove(self.op.node_name)
1179
    return env, all_nodes, all_nodes
1180

    
1181
  def CheckPrereq(self):
1182
    """Check prerequisites.
1183

1184
    This checks:
1185
     - the node exists in the configuration
1186
     - it does not have primary or secondary instances
1187
     - it's not the master
1188

1189
    Any errors are signalled by raising errors.OpPrereqError.
1190

1191
    """
1192
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1193
    if node is None:
1194
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1195

    
1196
    instance_list = self.cfg.GetInstanceList()
1197

    
1198
    masternode = self.sstore.GetMasterNode()
1199
    if node.name == masternode:
1200
      raise errors.OpPrereqError("Node is the master node,"
1201
                                 " you need to failover first.")
1202

    
1203
    for instance_name in instance_list:
1204
      instance = self.cfg.GetInstanceInfo(instance_name)
1205
      if node.name == instance.primary_node:
1206
        raise errors.OpPrereqError("Instance %s still running on the node,"
1207
                                   " please remove first." % instance_name)
1208
      if node.name in instance.secondary_nodes:
1209
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210
                                   " please remove first." % instance_name)
1211
    self.op.node_name = node.name
1212
    self.node = node
1213

    
1214
  def Exec(self, feedback_fn):
1215
    """Removes the node from the cluster.
1216

1217
    """
1218
    node = self.node
1219
    logger.Info("stopping the node daemon and removing configs from node %s" %
1220
                node.name)
1221

    
1222
    rpc.call_node_leave_cluster(node.name)
1223

    
1224
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1225

    
1226
    logger.Info("Removing node %s from config" % node.name)
1227

    
1228
    self.cfg.RemoveNode(node.name)
1229

    
1230
    _RemoveHostFromEtcHosts(node.name)
1231

    
1232

    
1233
class LUQueryNodes(NoHooksLU):
1234
  """Logical unit for querying nodes.
1235

1236
  """
1237
  _OP_REQP = ["output_fields", "names"]
1238

    
1239
  def CheckPrereq(self):
1240
    """Check prerequisites.
1241

1242
    This checks that the fields required are valid output fields.
1243

1244
    """
1245
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1246
                                     "mtotal", "mnode", "mfree",
1247
                                     "bootid"])
1248

    
1249
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250
                               "pinst_list", "sinst_list",
1251
                               "pip", "sip"],
1252
                       dynamic=self.dynamic_fields,
1253
                       selected=self.op.output_fields)
1254

    
1255
    self.wanted = _GetWantedNodes(self, self.op.names)
1256

    
1257
  def Exec(self, feedback_fn):
1258
    """Computes the list of nodes and their attributes.
1259

1260
    """
1261
    nodenames = self.wanted
1262
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1263

    
1264
    # begin data gathering
1265

    
1266
    if self.dynamic_fields.intersection(self.op.output_fields):
1267
      live_data = {}
1268
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269
      for name in nodenames:
1270
        nodeinfo = node_data.get(name, None)
1271
        if nodeinfo:
1272
          live_data[name] = {
1273
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278
            "bootid": nodeinfo['bootid'],
1279
            }
1280
        else:
1281
          live_data[name] = {}
1282
    else:
1283
      live_data = dict.fromkeys(nodenames, {})
1284

    
1285
    node_to_primary = dict([(name, set()) for name in nodenames])
1286
    node_to_secondary = dict([(name, set()) for name in nodenames])
1287

    
1288
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289
                             "sinst_cnt", "sinst_list"))
1290
    if inst_fields & frozenset(self.op.output_fields):
1291
      instancelist = self.cfg.GetInstanceList()
1292

    
1293
      for instance_name in instancelist:
1294
        inst = self.cfg.GetInstanceInfo(instance_name)
1295
        if inst.primary_node in node_to_primary:
1296
          node_to_primary[inst.primary_node].add(inst.name)
1297
        for secnode in inst.secondary_nodes:
1298
          if secnode in node_to_secondary:
1299
            node_to_secondary[secnode].add(inst.name)
1300

    
1301
    # end data gathering
1302

    
1303
    output = []
1304
    for node in nodelist:
1305
      node_output = []
1306
      for field in self.op.output_fields:
1307
        if field == "name":
1308
          val = node.name
1309
        elif field == "pinst_list":
1310
          val = list(node_to_primary[node.name])
1311
        elif field == "sinst_list":
1312
          val = list(node_to_secondary[node.name])
1313
        elif field == "pinst_cnt":
1314
          val = len(node_to_primary[node.name])
1315
        elif field == "sinst_cnt":
1316
          val = len(node_to_secondary[node.name])
1317
        elif field == "pip":
1318
          val = node.primary_ip
1319
        elif field == "sip":
1320
          val = node.secondary_ip
1321
        elif field in self.dynamic_fields:
1322
          val = live_data[node.name].get(field, None)
1323
        else:
1324
          raise errors.ParameterError(field)
1325
        node_output.append(val)
1326
      output.append(node_output)
1327

    
1328
    return output
1329

    
1330

    
1331
class LUQueryNodeVolumes(NoHooksLU):
1332
  """Logical unit for getting volumes on node(s).
1333

1334
  """
1335
  _OP_REQP = ["nodes", "output_fields"]
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks that the fields required are valid output fields.
1341

1342
    """
1343
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1344

    
1345
    _CheckOutputFields(static=["node"],
1346
                       dynamic=["phys", "vg", "name", "size", "instance"],
1347
                       selected=self.op.output_fields)
1348

    
1349

    
1350
  def Exec(self, feedback_fn):
1351
    """Computes the list of nodes and their attributes.
1352

1353
    """
1354
    nodenames = self.nodes
1355
    volumes = rpc.call_node_volumes(nodenames)
1356

    
1357
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358
             in self.cfg.GetInstanceList()]
1359

    
1360
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1361

    
1362
    output = []
1363
    for node in nodenames:
1364
      if node not in volumes or not volumes[node]:
1365
        continue
1366

    
1367
      node_vols = volumes[node][:]
1368
      node_vols.sort(key=lambda vol: vol['dev'])
1369

    
1370
      for vol in node_vols:
1371
        node_output = []
1372
        for field in self.op.output_fields:
1373
          if field == "node":
1374
            val = node
1375
          elif field == "phys":
1376
            val = vol['dev']
1377
          elif field == "vg":
1378
            val = vol['vg']
1379
          elif field == "name":
1380
            val = vol['name']
1381
          elif field == "size":
1382
            val = int(float(vol['size']))
1383
          elif field == "instance":
1384
            for inst in ilist:
1385
              if node not in lv_by_node[inst]:
1386
                continue
1387
              if vol['name'] in lv_by_node[inst][node]:
1388
                val = inst.name
1389
                break
1390
            else:
1391
              val = '-'
1392
          else:
1393
            raise errors.ParameterError(field)
1394
          node_output.append(str(val))
1395

    
1396
        output.append(node_output)
1397

    
1398
    return output
1399

    
1400

    
1401
class LUAddNode(LogicalUnit):
1402
  """Logical unit for adding node to the cluster.
1403

1404
  """
1405
  HPATH = "node-add"
1406
  HTYPE = constants.HTYPE_NODE
1407
  _OP_REQP = ["node_name"]
1408

    
1409
  def BuildHooksEnv(self):
1410
    """Build hooks env.
1411

1412
    This will run on all nodes before, and on all nodes + the new node after.
1413

1414
    """
1415
    env = {
1416
      "OP_TARGET": self.op.node_name,
1417
      "NODE_NAME": self.op.node_name,
1418
      "NODE_PIP": self.op.primary_ip,
1419
      "NODE_SIP": self.op.secondary_ip,
1420
      }
1421
    nodes_0 = self.cfg.GetNodeList()
1422
    nodes_1 = nodes_0 + [self.op.node_name, ]
1423
    return env, nodes_0, nodes_1
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    This checks:
1429
     - the new node is not already in the config
1430
     - it is resolvable
1431
     - its parameters (single/dual homed) matches the cluster
1432

1433
    Any errors are signalled by raising errors.OpPrereqError.
1434

1435
    """
1436
    node_name = self.op.node_name
1437
    cfg = self.cfg
1438

    
1439
    dns_data = utils.HostInfo(node_name)
1440

    
1441
    node = dns_data.name
1442
    primary_ip = self.op.primary_ip = dns_data.ip
1443
    secondary_ip = getattr(self.op, "secondary_ip", None)
1444
    if secondary_ip is None:
1445
      secondary_ip = primary_ip
1446
    if not utils.IsValidIP(secondary_ip):
1447
      raise errors.OpPrereqError("Invalid secondary IP given")
1448
    self.op.secondary_ip = secondary_ip
1449
    node_list = cfg.GetNodeList()
1450
    if node in node_list:
1451
      raise errors.OpPrereqError("Node %s is already in the configuration"
1452
                                 % node)
1453

    
1454
    for existing_node_name in node_list:
1455
      existing_node = cfg.GetNodeInfo(existing_node_name)
1456
      if (existing_node.primary_ip == primary_ip or
1457
          existing_node.secondary_ip == primary_ip or
1458
          existing_node.primary_ip == secondary_ip or
1459
          existing_node.secondary_ip == secondary_ip):
1460
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1461
                                   " existing node %s" % existing_node.name)
1462

    
1463
    # check that the type of the node (single versus dual homed) is the
1464
    # same as for the master
1465
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1467
    newbie_singlehomed = secondary_ip == primary_ip
1468
    if master_singlehomed != newbie_singlehomed:
1469
      if master_singlehomed:
1470
        raise errors.OpPrereqError("The master has no private ip but the"
1471
                                   " new node has one")
1472
      else:
1473
        raise errors.OpPrereqError("The master has a private ip but the"
1474
                                   " new node doesn't have one")
1475

    
1476
    # checks reachablity
1477
    if not utils.TcpPing(utils.HostInfo().name,
1478
                         primary_ip,
1479
                         constants.DEFAULT_NODED_PORT):
1480
      raise errors.OpPrereqError("Node not reachable by ping")
1481

    
1482
    if not newbie_singlehomed:
1483
      # check reachability from my secondary ip to newbie's secondary ip
1484
      if not utils.TcpPing(myself.secondary_ip,
1485
                           secondary_ip,
1486
                           constants.DEFAULT_NODED_PORT):
1487
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488
                                   " based ping to noded port")
1489

    
1490
    self.new_node = objects.Node(name=node,
1491
                                 primary_ip=primary_ip,
1492
                                 secondary_ip=secondary_ip)
1493

    
1494
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497
                                   constants.VNC_PASSWORD_FILE)
1498

    
1499
  def Exec(self, feedback_fn):
1500
    """Adds the new node to the cluster.
1501

1502
    """
1503
    new_node = self.new_node
1504
    node = new_node.name
1505

    
1506
    # set up inter-node password and certificate and restarts the node daemon
1507
    gntpass = self.sstore.GetNodeDaemonPassword()
1508
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509
      raise errors.OpExecError("ganeti password corruption detected")
1510
    f = open(constants.SSL_CERT_FILE)
1511
    try:
1512
      gntpem = f.read(8192)
1513
    finally:
1514
      f.close()
1515
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516
    # so we use this to detect an invalid certificate; as long as the
1517
    # cert doesn't contain this, the here-document will be correctly
1518
    # parsed by the shell sequence below
1519
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521
    if not gntpem.endswith("\n"):
1522
      raise errors.OpExecError("PEM must end with newline")
1523
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1524

    
1525
    # and then connect with ssh to set password and start ganeti-noded
1526
    # note that all the below variables are sanitized at this point,
1527
    # either by being constants or by the checks above
1528
    ss = self.sstore
1529
    mycommand = ("umask 077 && "
1530
                 "echo '%s' > '%s' && "
1531
                 "cat > '%s' << '!EOF.' && \n"
1532
                 "%s!EOF.\n%s restart" %
1533
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534
                  constants.SSL_CERT_FILE, gntpem,
1535
                  constants.NODE_INITD_SCRIPT))
1536

    
1537
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1538
    if result.failed:
1539
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1540
                               " output: %s" %
1541
                               (node, result.fail_reason, result.output))
1542

    
1543
    # check connectivity
1544
    time.sleep(4)
1545

    
1546
    result = rpc.call_version([node])[node]
1547
    if result:
1548
      if constants.PROTOCOL_VERSION == result:
1549
        logger.Info("communication to node %s fine, sw version %s match" %
1550
                    (node, result))
1551
      else:
1552
        raise errors.OpExecError("Version mismatch master version %s,"
1553
                                 " node version %s" %
1554
                                 (constants.PROTOCOL_VERSION, result))
1555
    else:
1556
      raise errors.OpExecError("Cannot get version from the new node")
1557

    
1558
    # setup ssh on node
1559
    logger.Info("copy ssh key to node %s" % node)
1560
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1561
    keyarray = []
1562
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1564
                priv_key, pub_key]
1565

    
1566
    for i in keyfiles:
1567
      f = open(i, 'r')
1568
      try:
1569
        keyarray.append(f.read())
1570
      finally:
1571
        f.close()
1572

    
1573
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574
                               keyarray[3], keyarray[4], keyarray[5])
1575

    
1576
    if not result:
1577
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1578

    
1579
    # Add node to our /etc/hosts, and add key to known_hosts
1580
    _AddHostToEtcHosts(new_node.name)
1581

    
1582
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583
                      self.cfg.GetHostKey())
1584

    
1585
    if new_node.secondary_ip != new_node.primary_ip:
1586
      if not rpc.call_node_tcp_ping(new_node.name,
1587
                                    constants.LOCALHOST_IP_ADDRESS,
1588
                                    new_node.secondary_ip,
1589
                                    constants.DEFAULT_NODED_PORT,
1590
                                    10, False):
1591
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592
                                 " you gave (%s). Please fix and re-run this"
1593
                                 " command." % new_node.secondary_ip)
1594

    
1595
    success, msg = ssh.VerifyNodeHostname(node)
1596
    if not success:
1597
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598
                               " than the one the resolver gives: %s."
1599
                               " Please fix and re-run this command." %
1600
                               (node, msg))
1601

    
1602
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1603
    # including the node just added
1604
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605
    dist_nodes = self.cfg.GetNodeList() + [node]
1606
    if myself.name in dist_nodes:
1607
      dist_nodes.remove(myself.name)
1608

    
1609
    logger.Debug("Copying hosts and known_hosts to all nodes")
1610
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611
      result = rpc.call_upload_file(dist_nodes, fname)
1612
      for to_node in dist_nodes:
1613
        if not result[to_node]:
1614
          logger.Error("copy of file %s to node %s failed" %
1615
                       (fname, to_node))
1616

    
1617
    to_copy = ss.GetFileList()
1618
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619
      to_copy.append(constants.VNC_PASSWORD_FILE)
1620
    for fname in to_copy:
1621
      if not ssh.CopyFileToNode(node, fname):
1622
        logger.Error("could not copy file %s to node %s" % (fname, node))
1623

    
1624
    logger.Info("adding node %s to cluster.conf" % node)
1625
    self.cfg.AddNode(new_node)
1626

    
1627

    
1628
class LUMasterFailover(LogicalUnit):
1629
  """Failover the master node to the current node.
1630

1631
  This is a special LU in that it must run on a non-master node.
1632

1633
  """
1634
  HPATH = "master-failover"
1635
  HTYPE = constants.HTYPE_CLUSTER
1636
  REQ_MASTER = False
1637
  _OP_REQP = []
1638

    
1639
  def BuildHooksEnv(self):
1640
    """Build hooks env.
1641

1642
    This will run on the new master only in the pre phase, and on all
1643
    the nodes in the post phase.
1644

1645
    """
1646
    env = {
1647
      "OP_TARGET": self.new_master,
1648
      "NEW_MASTER": self.new_master,
1649
      "OLD_MASTER": self.old_master,
1650
      }
1651
    return env, [self.new_master], self.cfg.GetNodeList()
1652

    
1653
  def CheckPrereq(self):
1654
    """Check prerequisites.
1655

1656
    This checks that we are not already the master.
1657

1658
    """
1659
    self.new_master = utils.HostInfo().name
1660
    self.old_master = self.sstore.GetMasterNode()
1661

    
1662
    if self.old_master == self.new_master:
1663
      raise errors.OpPrereqError("This commands must be run on the node"
1664
                                 " where you want the new master to be."
1665
                                 " %s is already the master" %
1666
                                 self.old_master)
1667

    
1668
  def Exec(self, feedback_fn):
1669
    """Failover the master node.
1670

1671
    This command, when run on a non-master node, will cause the current
1672
    master to cease being master, and the non-master to become new
1673
    master.
1674

1675
    """
1676
    #TODO: do not rely on gethostname returning the FQDN
1677
    logger.Info("setting master to %s, old master: %s" %
1678
                (self.new_master, self.old_master))
1679

    
1680
    if not rpc.call_node_stop_master(self.old_master):
1681
      logger.Error("could disable the master role on the old master"
1682
                   " %s, please disable manually" % self.old_master)
1683

    
1684
    ss = self.sstore
1685
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688
      logger.Error("could not distribute the new simple store master file"
1689
                   " to the other nodes, please check.")
1690

    
1691
    if not rpc.call_node_start_master(self.new_master):
1692
      logger.Error("could not start the master role on the new master"
1693
                   " %s, please check" % self.new_master)
1694
      feedback_fn("Error in activating the master IP on the new master,"
1695
                  " please fix manually.")
1696

    
1697

    
1698

    
1699
class LUQueryClusterInfo(NoHooksLU):
1700
  """Query cluster configuration.
1701

1702
  """
1703
  _OP_REQP = []
1704
  REQ_MASTER = False
1705

    
1706
  def CheckPrereq(self):
1707
    """No prerequsites needed for this LU.
1708

1709
    """
1710
    pass
1711

    
1712
  def Exec(self, feedback_fn):
1713
    """Return cluster config.
1714

1715
    """
1716
    result = {
1717
      "name": self.sstore.GetClusterName(),
1718
      "software_version": constants.RELEASE_VERSION,
1719
      "protocol_version": constants.PROTOCOL_VERSION,
1720
      "config_version": constants.CONFIG_VERSION,
1721
      "os_api_version": constants.OS_API_VERSION,
1722
      "export_version": constants.EXPORT_VERSION,
1723
      "master": self.sstore.GetMasterNode(),
1724
      "architecture": (platform.architecture()[0], platform.machine()),
1725
      }
1726

    
1727
    return result
1728

    
1729

    
1730
class LUClusterCopyFile(NoHooksLU):
1731
  """Copy file to cluster.
1732

1733
  """
1734
  _OP_REQP = ["nodes", "filename"]
1735

    
1736
  def CheckPrereq(self):
1737
    """Check prerequisites.
1738

1739
    It should check that the named file exists and that the given list
1740
    of nodes is valid.
1741

1742
    """
1743
    if not os.path.exists(self.op.filename):
1744
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1745

    
1746
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1747

    
1748
  def Exec(self, feedback_fn):
1749
    """Copy a file from master to some nodes.
1750

1751
    Args:
1752
      opts - class with options as members
1753
      args - list containing a single element, the file name
1754
    Opts used:
1755
      nodes - list containing the name of target nodes; if empty, all nodes
1756

1757
    """
1758
    filename = self.op.filename
1759

    
1760
    myname = utils.HostInfo().name
1761

    
1762
    for node in self.nodes:
1763
      if node == myname:
1764
        continue
1765
      if not ssh.CopyFileToNode(node, filename):
1766
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1767

    
1768

    
1769
class LUDumpClusterConfig(NoHooksLU):
1770
  """Return a text-representation of the cluster-config.
1771

1772
  """
1773
  _OP_REQP = []
1774

    
1775
  def CheckPrereq(self):
1776
    """No prerequisites.
1777

1778
    """
1779
    pass
1780

    
1781
  def Exec(self, feedback_fn):
1782
    """Dump a representation of the cluster config to the standard output.
1783

1784
    """
1785
    return self.cfg.DumpConfig()
1786

    
1787

    
1788
class LURunClusterCommand(NoHooksLU):
1789
  """Run a command on some nodes.
1790

1791
  """
1792
  _OP_REQP = ["command", "nodes"]
1793

    
1794
  def CheckPrereq(self):
1795
    """Check prerequisites.
1796

1797
    It checks that the given list of nodes is valid.
1798

1799
    """
1800
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1801

    
1802
  def Exec(self, feedback_fn):
1803
    """Run a command on some nodes.
1804

1805
    """
1806
    data = []
1807
    for node in self.nodes:
1808
      result = ssh.SSHCall(node, "root", self.op.command)
1809
      data.append((node, result.output, result.exit_code))
1810

    
1811
    return data
1812

    
1813

    
1814
class LUActivateInstanceDisks(NoHooksLU):
1815
  """Bring up an instance's disks.
1816

1817
  """
1818
  _OP_REQP = ["instance_name"]
1819

    
1820
  def CheckPrereq(self):
1821
    """Check prerequisites.
1822

1823
    This checks that the instance is in the cluster.
1824

1825
    """
1826
    instance = self.cfg.GetInstanceInfo(
1827
      self.cfg.ExpandInstanceName(self.op.instance_name))
1828
    if instance is None:
1829
      raise errors.OpPrereqError("Instance '%s' not known" %
1830
                                 self.op.instance_name)
1831
    self.instance = instance
1832

    
1833

    
1834
  def Exec(self, feedback_fn):
1835
    """Activate the disks.
1836

1837
    """
1838
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1839
    if not disks_ok:
1840
      raise errors.OpExecError("Cannot activate block devices")
1841

    
1842
    return disks_info
1843

    
1844

    
1845
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846
  """Prepare the block devices for an instance.
1847

1848
  This sets up the block devices on all nodes.
1849

1850
  Args:
1851
    instance: a ganeti.objects.Instance object
1852
    ignore_secondaries: if true, errors on secondary nodes won't result
1853
                        in an error return from the function
1854

1855
  Returns:
1856
    false if the operation failed
1857
    list of (host, instance_visible_name, node_visible_name) if the operation
1858
         suceeded with the mapping from node devices to instance devices
1859
  """
1860
  device_info = []
1861
  disks_ok = True
1862
  for inst_disk in instance.disks:
1863
    master_result = None
1864
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865
      cfg.SetDiskID(node_disk, node)
1866
      is_primary = node == instance.primary_node
1867
      result = rpc.call_blockdev_assemble(node, node_disk,
1868
                                          instance.name, is_primary)
1869
      if not result:
1870
        logger.Error("could not prepare block device %s on node %s"
1871
                     " (is_primary=%s)" %
1872
                     (inst_disk.iv_name, node, is_primary))
1873
        if is_primary or not ignore_secondaries:
1874
          disks_ok = False
1875
      if is_primary:
1876
        master_result = result
1877
    device_info.append((instance.primary_node, inst_disk.iv_name,
1878
                        master_result))
1879

    
1880
  # leave the disks configured for the primary node
1881
  # this is a workaround that would be fixed better by
1882
  # improving the logical/physical id handling
1883
  for disk in instance.disks:
1884
    cfg.SetDiskID(disk, instance.primary_node)
1885

    
1886
  return disks_ok, device_info
1887

    
1888

    
1889
def _StartInstanceDisks(cfg, instance, force):
1890
  """Start the disks of an instance.
1891

1892
  """
1893
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894
                                           ignore_secondaries=force)
1895
  if not disks_ok:
1896
    _ShutdownInstanceDisks(instance, cfg)
1897
    if force is not None and not force:
1898
      logger.Error("If the message above refers to a secondary node,"
1899
                   " you can retry the operation using '--force'.")
1900
    raise errors.OpExecError("Disk consistency error")
1901

    
1902

    
1903
class LUDeactivateInstanceDisks(NoHooksLU):
1904
  """Shutdown an instance's disks.
1905

1906
  """
1907
  _OP_REQP = ["instance_name"]
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks that the instance is in the cluster.
1913

1914
    """
1915
    instance = self.cfg.GetInstanceInfo(
1916
      self.cfg.ExpandInstanceName(self.op.instance_name))
1917
    if instance is None:
1918
      raise errors.OpPrereqError("Instance '%s' not known" %
1919
                                 self.op.instance_name)
1920
    self.instance = instance
1921

    
1922
  def Exec(self, feedback_fn):
1923
    """Deactivate the disks
1924

1925
    """
1926
    instance = self.instance
1927
    ins_l = rpc.call_instance_list([instance.primary_node])
1928
    ins_l = ins_l[instance.primary_node]
1929
    if not type(ins_l) is list:
1930
      raise errors.OpExecError("Can't contact node '%s'" %
1931
                               instance.primary_node)
1932

    
1933
    if self.instance.name in ins_l:
1934
      raise errors.OpExecError("Instance is running, can't shutdown"
1935
                               " block devices.")
1936

    
1937
    _ShutdownInstanceDisks(instance, self.cfg)
1938

    
1939

    
1940
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941
  """Shutdown block devices of an instance.
1942

1943
  This does the shutdown on all nodes of the instance.
1944

1945
  If the ignore_primary is false, errors on the primary node are
1946
  ignored.
1947

1948
  """
1949
  result = True
1950
  for disk in instance.disks:
1951
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(top_disk, node)
1953
      if not rpc.call_blockdev_shutdown(node, top_disk):
1954
        logger.Error("could not shutdown block device %s on node %s" %
1955
                     (disk.iv_name, node))
1956
        if not ignore_primary or node != instance.primary_node:
1957
          result = False
1958
  return result
1959

    
1960

    
1961
class LUStartupInstance(LogicalUnit):
1962
  """Starts an instance.
1963

1964
  """
1965
  HPATH = "instance-start"
1966
  HTYPE = constants.HTYPE_INSTANCE
1967
  _OP_REQP = ["instance_name", "force"]
1968

    
1969
  def BuildHooksEnv(self):
1970
    """Build hooks env.
1971

1972
    This runs on master, primary and secondary nodes of the instance.
1973

1974
    """
1975
    env = {
1976
      "FORCE": self.op.force,
1977
      }
1978
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1979
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1980
          list(self.instance.secondary_nodes))
1981
    return env, nl, nl
1982

    
1983
  def CheckPrereq(self):
1984
    """Check prerequisites.
1985

1986
    This checks that the instance is in the cluster.
1987

1988
    """
1989
    instance = self.cfg.GetInstanceInfo(
1990
      self.cfg.ExpandInstanceName(self.op.instance_name))
1991
    if instance is None:
1992
      raise errors.OpPrereqError("Instance '%s' not known" %
1993
                                 self.op.instance_name)
1994

    
1995
    # check bridges existance
1996
    _CheckInstanceBridgesExist(instance)
1997

    
1998
    self.instance = instance
1999
    self.op.instance_name = instance.name
2000

    
2001
  def Exec(self, feedback_fn):
2002
    """Start the instance.
2003

2004
    """
2005
    instance = self.instance
2006
    force = self.op.force
2007
    extra_args = getattr(self.op, "extra_args", "")
2008

    
2009
    node_current = instance.primary_node
2010

    
2011
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
2012
    if not nodeinfo:
2013
      raise errors.OpExecError("Could not contact node %s for infos" %
2014
                               (node_current))
2015

    
2016
    freememory = nodeinfo[node_current]['memory_free']
2017
    memory = instance.memory
2018
    if memory > freememory:
2019
      raise errors.OpExecError("Not enough memory to start instance"
2020
                               " %s on node %s"
2021
                               " needed %s MiB, available %s MiB" %
2022
                               (instance.name, node_current, memory,
2023
                                freememory))
2024

    
2025
    _StartInstanceDisks(self.cfg, instance, force)
2026

    
2027
    if not rpc.call_instance_start(node_current, instance, extra_args):
2028
      _ShutdownInstanceDisks(instance, self.cfg)
2029
      raise errors.OpExecError("Could not start instance")
2030

    
2031
    self.cfg.MarkInstanceUp(instance.name)
2032

    
2033

    
2034
class LURebootInstance(LogicalUnit):
2035
  """Reboot an instance.
2036

2037
  """
2038
  HPATH = "instance-reboot"
2039
  HTYPE = constants.HTYPE_INSTANCE
2040
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2041

    
2042
  def BuildHooksEnv(self):
2043
    """Build hooks env.
2044

2045
    This runs on master, primary and secondary nodes of the instance.
2046

2047
    """
2048
    env = {
2049
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2050
      }
2051
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2052
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053
          list(self.instance.secondary_nodes))
2054
    return env, nl, nl
2055

    
2056
  def CheckPrereq(self):
2057
    """Check prerequisites.
2058

2059
    This checks that the instance is in the cluster.
2060

2061
    """
2062
    instance = self.cfg.GetInstanceInfo(
2063
      self.cfg.ExpandInstanceName(self.op.instance_name))
2064
    if instance is None:
2065
      raise errors.OpPrereqError("Instance '%s' not known" %
2066
                                 self.op.instance_name)
2067

    
2068
    # check bridges existance
2069
    _CheckInstanceBridgesExist(instance)
2070

    
2071
    self.instance = instance
2072
    self.op.instance_name = instance.name
2073

    
2074
  def Exec(self, feedback_fn):
2075
    """Reboot the instance.
2076

2077
    """
2078
    instance = self.instance
2079
    ignore_secondaries = self.op.ignore_secondaries
2080
    reboot_type = self.op.reboot_type
2081
    extra_args = getattr(self.op, "extra_args", "")
2082

    
2083
    node_current = instance.primary_node
2084

    
2085
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086
                           constants.INSTANCE_REBOOT_HARD,
2087
                           constants.INSTANCE_REBOOT_FULL]:
2088
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089
                                  (constants.INSTANCE_REBOOT_SOFT,
2090
                                   constants.INSTANCE_REBOOT_HARD,
2091
                                   constants.INSTANCE_REBOOT_FULL))
2092

    
2093
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2094
                       constants.INSTANCE_REBOOT_HARD]:
2095
      if not rpc.call_instance_reboot(node_current, instance,
2096
                                      reboot_type, extra_args):
2097
        raise errors.OpExecError("Could not reboot instance")
2098
    else:
2099
      if not rpc.call_instance_shutdown(node_current, instance):
2100
        raise errors.OpExecError("could not shutdown instance for full reboot")
2101
      _ShutdownInstanceDisks(instance, self.cfg)
2102
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2103
      if not rpc.call_instance_start(node_current, instance, extra_args):
2104
        _ShutdownInstanceDisks(instance, self.cfg)
2105
        raise errors.OpExecError("Could not start instance for full reboot")
2106

    
2107
    self.cfg.MarkInstanceUp(instance.name)
2108

    
2109

    
2110
class LUShutdownInstance(LogicalUnit):
2111
  """Shutdown an instance.
2112

2113
  """
2114
  HPATH = "instance-stop"
2115
  HTYPE = constants.HTYPE_INSTANCE
2116
  _OP_REQP = ["instance_name"]
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This runs on master, primary and secondary nodes of the instance.
2122

2123
    """
2124
    env = _BuildInstanceHookEnvByObject(self.instance)
2125
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126
          list(self.instance.secondary_nodes))
2127
    return env, nl, nl
2128

    
2129
  def CheckPrereq(self):
2130
    """Check prerequisites.
2131

2132
    This checks that the instance is in the cluster.
2133

2134
    """
2135
    instance = self.cfg.GetInstanceInfo(
2136
      self.cfg.ExpandInstanceName(self.op.instance_name))
2137
    if instance is None:
2138
      raise errors.OpPrereqError("Instance '%s' not known" %
2139
                                 self.op.instance_name)
2140
    self.instance = instance
2141

    
2142
  def Exec(self, feedback_fn):
2143
    """Shutdown the instance.
2144

2145
    """
2146
    instance = self.instance
2147
    node_current = instance.primary_node
2148
    if not rpc.call_instance_shutdown(node_current, instance):
2149
      logger.Error("could not shutdown instance")
2150

    
2151
    self.cfg.MarkInstanceDown(instance.name)
2152
    _ShutdownInstanceDisks(instance, self.cfg)
2153

    
2154

    
2155
class LUReinstallInstance(LogicalUnit):
2156
  """Reinstall an instance.
2157

2158
  """
2159
  HPATH = "instance-reinstall"
2160
  HTYPE = constants.HTYPE_INSTANCE
2161
  _OP_REQP = ["instance_name"]
2162

    
2163
  def BuildHooksEnv(self):
2164
    """Build hooks env.
2165

2166
    This runs on master, primary and secondary nodes of the instance.
2167

2168
    """
2169
    env = _BuildInstanceHookEnvByObject(self.instance)
2170
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2171
          list(self.instance.secondary_nodes))
2172
    return env, nl, nl
2173

    
2174
  def CheckPrereq(self):
2175
    """Check prerequisites.
2176

2177
    This checks that the instance is in the cluster and is not running.
2178

2179
    """
2180
    instance = self.cfg.GetInstanceInfo(
2181
      self.cfg.ExpandInstanceName(self.op.instance_name))
2182
    if instance is None:
2183
      raise errors.OpPrereqError("Instance '%s' not known" %
2184
                                 self.op.instance_name)
2185
    if instance.disk_template == constants.DT_DISKLESS:
2186
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2187
                                 self.op.instance_name)
2188
    if instance.status != "down":
2189
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2190
                                 self.op.instance_name)
2191
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2192
    if remote_info:
2193
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2194
                                 (self.op.instance_name,
2195
                                  instance.primary_node))
2196

    
2197
    self.op.os_type = getattr(self.op, "os_type", None)
2198
    if self.op.os_type is not None:
2199
      # OS verification
2200
      pnode = self.cfg.GetNodeInfo(
2201
        self.cfg.ExpandNodeName(instance.primary_node))
2202
      if pnode is None:
2203
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2204
                                   self.op.pnode)
2205
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2206
      if not os_obj:
2207
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2208
                                   " primary node"  % self.op.os_type)
2209

    
2210
    self.instance = instance
2211

    
2212
  def Exec(self, feedback_fn):
2213
    """Reinstall the instance.
2214

2215
    """
2216
    inst = self.instance
2217

    
2218
    if self.op.os_type is not None:
2219
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2220
      inst.os = self.op.os_type
2221
      self.cfg.AddInstance(inst)
2222

    
2223
    _StartInstanceDisks(self.cfg, inst, None)
2224
    try:
2225
      feedback_fn("Running the instance OS create scripts...")
2226
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2227
        raise errors.OpExecError("Could not install OS for instance %s"
2228
                                 " on node %s" %
2229
                                 (inst.name, inst.primary_node))
2230
    finally:
2231
      _ShutdownInstanceDisks(inst, self.cfg)
2232

    
2233

    
2234
class LURenameInstance(LogicalUnit):
2235
  """Rename an instance.
2236

2237
  """
2238
  HPATH = "instance-rename"
2239
  HTYPE = constants.HTYPE_INSTANCE
2240
  _OP_REQP = ["instance_name", "new_name"]
2241

    
2242
  def BuildHooksEnv(self):
2243
    """Build hooks env.
2244

2245
    This runs on master, primary and secondary nodes of the instance.
2246

2247
    """
2248
    env = _BuildInstanceHookEnvByObject(self.instance)
2249
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2250
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2251
          list(self.instance.secondary_nodes))
2252
    return env, nl, nl
2253

    
2254
  def CheckPrereq(self):
2255
    """Check prerequisites.
2256

2257
    This checks that the instance is in the cluster and is not running.
2258

2259
    """
2260
    instance = self.cfg.GetInstanceInfo(
2261
      self.cfg.ExpandInstanceName(self.op.instance_name))
2262
    if instance is None:
2263
      raise errors.OpPrereqError("Instance '%s' not known" %
2264
                                 self.op.instance_name)
2265
    if instance.status != "down":
2266
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2267
                                 self.op.instance_name)
2268
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2269
    if remote_info:
2270
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2271
                                 (self.op.instance_name,
2272
                                  instance.primary_node))
2273
    self.instance = instance
2274

    
2275
    # new name verification
2276
    name_info = utils.HostInfo(self.op.new_name)
2277

    
2278
    self.op.new_name = new_name = name_info.name
2279
    if not getattr(self.op, "ignore_ip", False):
2280
      command = ["fping", "-q", name_info.ip]
2281
      result = utils.RunCmd(command)
2282
      if not result.failed:
2283
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2284
                                   (name_info.ip, new_name))
2285

    
2286

    
2287
  def Exec(self, feedback_fn):
2288
    """Reinstall the instance.
2289

2290
    """
2291
    inst = self.instance
2292
    old_name = inst.name
2293

    
2294
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2295

    
2296
    # re-read the instance from the configuration after rename
2297
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2298

    
2299
    _StartInstanceDisks(self.cfg, inst, None)
2300
    try:
2301
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2302
                                          "sda", "sdb"):
2303
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2304
               " instance has been renamed in Ganeti)" %
2305
               (inst.name, inst.primary_node))
2306
        logger.Error(msg)
2307
    finally:
2308
      _ShutdownInstanceDisks(inst, self.cfg)
2309

    
2310

    
2311
class LURemoveInstance(LogicalUnit):
2312
  """Remove an instance.
2313

2314
  """
2315
  HPATH = "instance-remove"
2316
  HTYPE = constants.HTYPE_INSTANCE
2317
  _OP_REQP = ["instance_name"]
2318

    
2319
  def BuildHooksEnv(self):
2320
    """Build hooks env.
2321

2322
    This runs on master, primary and secondary nodes of the instance.
2323

2324
    """
2325
    env = _BuildInstanceHookEnvByObject(self.instance)
2326
    nl = [self.sstore.GetMasterNode()]
2327
    return env, nl, nl
2328

    
2329
  def CheckPrereq(self):
2330
    """Check prerequisites.
2331

2332
    This checks that the instance is in the cluster.
2333

2334
    """
2335
    instance = self.cfg.GetInstanceInfo(
2336
      self.cfg.ExpandInstanceName(self.op.instance_name))
2337
    if instance is None:
2338
      raise errors.OpPrereqError("Instance '%s' not known" %
2339
                                 self.op.instance_name)
2340
    self.instance = instance
2341

    
2342
  def Exec(self, feedback_fn):
2343
    """Remove the instance.
2344

2345
    """
2346
    instance = self.instance
2347
    logger.Info("shutting down instance %s on node %s" %
2348
                (instance.name, instance.primary_node))
2349

    
2350
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351
      if self.op.ignore_failures:
2352
        feedback_fn("Warning: can't shutdown instance")
2353
      else:
2354
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355
                                 (instance.name, instance.primary_node))
2356

    
2357
    logger.Info("removing block devices for instance %s" % instance.name)
2358

    
2359
    if not _RemoveDisks(instance, self.cfg):
2360
      if self.op.ignore_failures:
2361
        feedback_fn("Warning: can't remove instance's disks")
2362
      else:
2363
        raise errors.OpExecError("Can't remove instance's disks")
2364

    
2365
    logger.Info("removing instance %s out of cluster config" % instance.name)
2366

    
2367
    self.cfg.RemoveInstance(instance.name)
2368

    
2369

    
2370
class LUQueryInstances(NoHooksLU):
2371
  """Logical unit for querying instances.
2372

2373
  """
2374
  _OP_REQP = ["output_fields", "names"]
2375

    
2376
  def CheckPrereq(self):
2377
    """Check prerequisites.
2378

2379
    This checks that the fields required are valid output fields.
2380

2381
    """
2382
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2383
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384
                               "admin_state", "admin_ram",
2385
                               "disk_template", "ip", "mac", "bridge",
2386
                               "sda_size", "sdb_size"],
2387
                       dynamic=self.dynamic_fields,
2388
                       selected=self.op.output_fields)
2389

    
2390
    self.wanted = _GetWantedInstances(self, self.op.names)
2391

    
2392
  def Exec(self, feedback_fn):
2393
    """Computes the list of nodes and their attributes.
2394

2395
    """
2396
    instance_names = self.wanted
2397
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2398
                     in instance_names]
2399

    
2400
    # begin data gathering
2401

    
2402
    nodes = frozenset([inst.primary_node for inst in instance_list])
2403

    
2404
    bad_nodes = []
2405
    if self.dynamic_fields.intersection(self.op.output_fields):
2406
      live_data = {}
2407
      node_data = rpc.call_all_instances_info(nodes)
2408
      for name in nodes:
2409
        result = node_data[name]
2410
        if result:
2411
          live_data.update(result)
2412
        elif result == False:
2413
          bad_nodes.append(name)
2414
        # else no instance is alive
2415
    else:
2416
      live_data = dict([(name, {}) for name in instance_names])
2417

    
2418
    # end data gathering
2419

    
2420
    output = []
2421
    for instance in instance_list:
2422
      iout = []
2423
      for field in self.op.output_fields:
2424
        if field == "name":
2425
          val = instance.name
2426
        elif field == "os":
2427
          val = instance.os
2428
        elif field == "pnode":
2429
          val = instance.primary_node
2430
        elif field == "snodes":
2431
          val = list(instance.secondary_nodes)
2432
        elif field == "admin_state":
2433
          val = (instance.status != "down")
2434
        elif field == "oper_state":
2435
          if instance.primary_node in bad_nodes:
2436
            val = None
2437
          else:
2438
            val = bool(live_data.get(instance.name))
2439
        elif field == "admin_ram":
2440
          val = instance.memory
2441
        elif field == "oper_ram":
2442
          if instance.primary_node in bad_nodes:
2443
            val = None
2444
          elif instance.name in live_data:
2445
            val = live_data[instance.name].get("memory", "?")
2446
          else:
2447
            val = "-"
2448
        elif field == "disk_template":
2449
          val = instance.disk_template
2450
        elif field == "ip":
2451
          val = instance.nics[0].ip
2452
        elif field == "bridge":
2453
          val = instance.nics[0].bridge
2454
        elif field == "mac":
2455
          val = instance.nics[0].mac
2456
        elif field == "sda_size" or field == "sdb_size":
2457
          disk = instance.FindDisk(field[:3])
2458
          if disk is None:
2459
            val = None
2460
          else:
2461
            val = disk.size
2462
        else:
2463
          raise errors.ParameterError(field)
2464
        iout.append(val)
2465
      output.append(iout)
2466

    
2467
    return output
2468

    
2469

    
2470
class LUFailoverInstance(LogicalUnit):
2471
  """Failover an instance.
2472

2473
  """
2474
  HPATH = "instance-failover"
2475
  HTYPE = constants.HTYPE_INSTANCE
2476
  _OP_REQP = ["instance_name", "ignore_consistency"]
2477

    
2478
  def BuildHooksEnv(self):
2479
    """Build hooks env.
2480

2481
    This runs on master, primary and secondary nodes of the instance.
2482

2483
    """
2484
    env = {
2485
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2486
      }
2487
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2488
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502

    
2503
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2504
      raise errors.OpPrereqError("Instance's disk layout is not"
2505
                                 " network mirrored, cannot failover.")
2506

    
2507
    secondary_nodes = instance.secondary_nodes
2508
    if not secondary_nodes:
2509
      raise errors.ProgrammerError("no secondary node but using "
2510
                                   "DT_REMOTE_RAID1 template")
2511

    
2512
    # check memory requirements on the secondary node
2513
    target_node = secondary_nodes[0]
2514
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2515
    info = nodeinfo.get(target_node, None)
2516
    if not info:
2517
      raise errors.OpPrereqError("Cannot get current information"
2518
                                 " from node '%s'" % nodeinfo)
2519
    if instance.memory > info['memory_free']:
2520
      raise errors.OpPrereqError("Not enough memory on target node %s."
2521
                                 " %d MB available, %d MB required" %
2522
                                 (target_node, info['memory_free'],
2523
                                  instance.memory))
2524

    
2525
    # check bridge existance
2526
    brlist = [nic.bridge for nic in instance.nics]
2527
    if not rpc.call_bridges_exist(target_node, brlist):
2528
      raise errors.OpPrereqError("One or more target bridges %s does not"
2529
                                 " exist on destination node '%s'" %
2530
                                 (brlist, target_node))
2531

    
2532
    self.instance = instance
2533

    
2534
  def Exec(self, feedback_fn):
2535
    """Failover an instance.
2536

2537
    The failover is done by shutting it down on its present node and
2538
    starting it on the secondary.
2539

2540
    """
2541
    instance = self.instance
2542

    
2543
    source_node = instance.primary_node
2544
    target_node = instance.secondary_nodes[0]
2545

    
2546
    feedback_fn("* checking disk consistency between source and target")
2547
    for dev in instance.disks:
2548
      # for remote_raid1, these are md over drbd
2549
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2550
        if not self.op.ignore_consistency:
2551
          raise errors.OpExecError("Disk %s is degraded on target node,"
2552
                                   " aborting failover." % dev.iv_name)
2553

    
2554
    feedback_fn("* checking target node resource availability")
2555
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2556

    
2557
    if not nodeinfo:
2558
      raise errors.OpExecError("Could not contact target node %s." %
2559
                               target_node)
2560

    
2561
    free_memory = int(nodeinfo[target_node]['memory_free'])
2562
    memory = instance.memory
2563
    if memory > free_memory:
2564
      raise errors.OpExecError("Not enough memory to create instance %s on"
2565
                               " node %s. needed %s MiB, available %s MiB" %
2566
                               (instance.name, target_node, memory,
2567
                                free_memory))
2568

    
2569
    feedback_fn("* shutting down instance on source node")
2570
    logger.Info("Shutting down instance %s on node %s" %
2571
                (instance.name, source_node))
2572

    
2573
    if not rpc.call_instance_shutdown(source_node, instance):
2574
      if self.op.ignore_consistency:
2575
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576
                     " anyway. Please make sure node %s is down"  %
2577
                     (instance.name, source_node, source_node))
2578
      else:
2579
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580
                                 (instance.name, source_node))
2581

    
2582
    feedback_fn("* deactivating the instance's disks on source node")
2583
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584
      raise errors.OpExecError("Can't shut down the instance's disks.")
2585

    
2586
    instance.primary_node = target_node
2587
    # distribute new instance config to the other nodes
2588
    self.cfg.AddInstance(instance)
2589

    
2590
    feedback_fn("* activating the instance's disks on target node")
2591
    logger.Info("Starting instance %s on node %s" %
2592
                (instance.name, target_node))
2593

    
2594
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595
                                             ignore_secondaries=True)
2596
    if not disks_ok:
2597
      _ShutdownInstanceDisks(instance, self.cfg)
2598
      raise errors.OpExecError("Can't activate the instance's disks")
2599

    
2600
    feedback_fn("* starting the instance on the target node")
2601
    if not rpc.call_instance_start(target_node, instance, None):
2602
      _ShutdownInstanceDisks(instance, self.cfg)
2603
      raise errors.OpExecError("Could not start instance %s on node %s." %
2604
                               (instance.name, target_node))
2605

    
2606

    
2607
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608
  """Create a tree of block devices on the primary node.
2609

2610
  This always creates all devices.
2611

2612
  """
2613
  if device.children:
2614
    for child in device.children:
2615
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2616
        return False
2617

    
2618
  cfg.SetDiskID(device, node)
2619
  new_id = rpc.call_blockdev_create(node, device, device.size,
2620
                                    instance.name, True, info)
2621
  if not new_id:
2622
    return False
2623
  if device.physical_id is None:
2624
    device.physical_id = new_id
2625
  return True
2626

    
2627

    
2628
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629
  """Create a tree of block devices on a secondary node.
2630

2631
  If this device type has to be created on secondaries, create it and
2632
  all its children.
2633

2634
  If not, just recurse to children keeping the same 'force' value.
2635

2636
  """
2637
  if device.CreateOnSecondary():
2638
    force = True
2639
  if device.children:
2640
    for child in device.children:
2641
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642
                                        child, force, info):
2643
        return False
2644

    
2645
  if not force:
2646
    return True
2647
  cfg.SetDiskID(device, node)
2648
  new_id = rpc.call_blockdev_create(node, device, device.size,
2649
                                    instance.name, False, info)
2650
  if not new_id:
2651
    return False
2652
  if device.physical_id is None:
2653
    device.physical_id = new_id
2654
  return True
2655

    
2656

    
2657
def _GenerateUniqueNames(cfg, exts):
2658
  """Generate a suitable LV name.
2659

2660
  This will generate a logical volume name for the given instance.
2661

2662
  """
2663
  results = []
2664
  for val in exts:
2665
    new_id = cfg.GenerateUniqueID()
2666
    results.append("%s%s" % (new_id, val))
2667
  return results
2668

    
2669

    
2670
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671
  """Generate a drbd device complete with its children.
2672

2673
  """
2674
  port = cfg.AllocatePort()
2675
  vgname = cfg.GetVGName()
2676
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677
                          logical_id=(vgname, names[0]))
2678
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679
                          logical_id=(vgname, names[1]))
2680
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681
                          logical_id = (primary, secondary, port),
2682
                          children = [dev_data, dev_meta])
2683
  return drbd_dev
2684

    
2685

    
2686
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687
  """Generate a drbd8 device complete with its children.
2688

2689
  """
2690
  port = cfg.AllocatePort()
2691
  vgname = cfg.GetVGName()
2692
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693
                          logical_id=(vgname, names[0]))
2694
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695
                          logical_id=(vgname, names[1]))
2696
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697
                          logical_id = (primary, secondary, port),
2698
                          children = [dev_data, dev_meta],
2699
                          iv_name=iv_name)
2700
  return drbd_dev
2701

    
2702
def _GenerateDiskTemplate(cfg, template_name,
2703
                          instance_name, primary_node,
2704
                          secondary_nodes, disk_sz, swap_sz):
2705
  """Generate the entire disk layout for a given template type.
2706

2707
  """
2708
  #TODO: compute space requirements
2709

    
2710
  vgname = cfg.GetVGName()
2711
  if template_name == "diskless":
2712
    disks = []
2713
  elif template_name == "plain":
2714
    if len(secondary_nodes) != 0:
2715
      raise errors.ProgrammerError("Wrong template configuration")
2716

    
2717
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719
                           logical_id=(vgname, names[0]),
2720
                           iv_name = "sda")
2721
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722
                           logical_id=(vgname, names[1]),
2723
                           iv_name = "sdb")
2724
    disks = [sda_dev, sdb_dev]
2725
  elif template_name == "local_raid1":
2726
    if len(secondary_nodes) != 0:
2727
      raise errors.ProgrammerError("Wrong template configuration")
2728

    
2729

    
2730
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731
                                       ".sdb_m1", ".sdb_m2"])
2732
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733
                              logical_id=(vgname, names[0]))
2734
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735
                              logical_id=(vgname, names[1]))
2736
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2737
                              size=disk_sz,
2738
                              children = [sda_dev_m1, sda_dev_m2])
2739
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740
                              logical_id=(vgname, names[2]))
2741
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742
                              logical_id=(vgname, names[3]))
2743
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2744
                              size=swap_sz,
2745
                              children = [sdb_dev_m1, sdb_dev_m2])
2746
    disks = [md_sda_dev, md_sdb_dev]
2747
  elif template_name == constants.DT_REMOTE_RAID1:
2748
    if len(secondary_nodes) != 1:
2749
      raise errors.ProgrammerError("Wrong template configuration")
2750
    remote_node = secondary_nodes[0]
2751
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752
                                       ".sdb_data", ".sdb_meta"])
2753
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754
                                         disk_sz, names[0:2])
2755
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756
                              children = [drbd_sda_dev], size=disk_sz)
2757
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758
                                         swap_sz, names[2:4])
2759
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760
                              children = [drbd_sdb_dev], size=swap_sz)
2761
    disks = [md_sda_dev, md_sdb_dev]
2762
  elif template_name == constants.DT_DRBD8:
2763
    if len(secondary_nodes) != 1:
2764
      raise errors.ProgrammerError("Wrong template configuration")
2765
    remote_node = secondary_nodes[0]
2766
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767
                                       ".sdb_data", ".sdb_meta"])
2768
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769
                                         disk_sz, names[0:2], "sda")
2770
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771
                                         swap_sz, names[2:4], "sdb")
2772
    disks = [drbd_sda_dev, drbd_sdb_dev]
2773
  else:
2774
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2775
  return disks
2776

    
2777

    
2778
def _GetInstanceInfoText(instance):
2779
  """Compute that text that should be added to the disk's metadata.
2780

2781
  """
2782
  return "originstname+%s" % instance.name
2783

    
2784

    
2785
def _CreateDisks(cfg, instance):
2786
  """Create all disks for an instance.
2787

2788
  This abstracts away some work from AddInstance.
2789

2790
  Args:
2791
    instance: the instance object
2792

2793
  Returns:
2794
    True or False showing the success of the creation process
2795

2796
  """
2797
  info = _GetInstanceInfoText(instance)
2798

    
2799
  for device in instance.disks:
2800
    logger.Info("creating volume %s for instance %s" %
2801
              (device.iv_name, instance.name))
2802
    #HARDCODE
2803
    for secondary_node in instance.secondary_nodes:
2804
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805
                                        device, False, info):
2806
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807
                     (device.iv_name, device, secondary_node))
2808
        return False
2809
    #HARDCODE
2810
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811
                                    instance, device, info):
2812
      logger.Error("failed to create volume %s on primary!" %
2813
                   device.iv_name)
2814
      return False
2815
  return True
2816

    
2817

    
2818
def _RemoveDisks(instance, cfg):
2819
  """Remove all disks for an instance.
2820

2821
  This abstracts away some work from `AddInstance()` and
2822
  `RemoveInstance()`. Note that in case some of the devices couldn't
2823
  be removed, the removal will continue with the other ones (compare
2824
  with `_CreateDisks()`).
2825

2826
  Args:
2827
    instance: the instance object
2828

2829
  Returns:
2830
    True or False showing the success of the removal proces
2831

2832
  """
2833
  logger.Info("removing block devices for instance %s" % instance.name)
2834

    
2835
  result = True
2836
  for device in instance.disks:
2837
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2838
      cfg.SetDiskID(disk, node)
2839
      if not rpc.call_blockdev_remove(node, disk):
2840
        logger.Error("could not remove block device %s on node %s,"
2841
                     " continuing anyway" %
2842
                     (device.iv_name, node))
2843
        result = False
2844
  return result
2845

    
2846

    
2847
class LUCreateInstance(LogicalUnit):
2848
  """Create an instance.
2849

2850
  """
2851
  HPATH = "instance-add"
2852
  HTYPE = constants.HTYPE_INSTANCE
2853
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854
              "disk_template", "swap_size", "mode", "start", "vcpus",
2855
              "wait_for_sync", "ip_check", "mac"]
2856

    
2857
  def BuildHooksEnv(self):
2858
    """Build hooks env.
2859

2860
    This runs on master, primary and secondary nodes of the instance.
2861

2862
    """
2863
    env = {
2864
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2866
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867
      "INSTANCE_ADD_MODE": self.op.mode,
2868
      }
2869
    if self.op.mode == constants.INSTANCE_IMPORT:
2870
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2871
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2872
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2873

    
2874
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875
      primary_node=self.op.pnode,
2876
      secondary_nodes=self.secondaries,
2877
      status=self.instance_status,
2878
      os_type=self.op.os_type,
2879
      memory=self.op.mem_size,
2880
      vcpus=self.op.vcpus,
2881
      nics=[(self.inst_ip, self.op.bridge)],
2882
    ))
2883

    
2884
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2885
          self.secondaries)
2886
    return env, nl, nl
2887

    
2888

    
2889
  def CheckPrereq(self):
2890
    """Check prerequisites.
2891

2892
    """
2893
    if self.op.mode not in (constants.INSTANCE_CREATE,
2894
                            constants.INSTANCE_IMPORT):
2895
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2896
                                 self.op.mode)
2897

    
2898
    if self.op.mode == constants.INSTANCE_IMPORT:
2899
      src_node = getattr(self.op, "src_node", None)
2900
      src_path = getattr(self.op, "src_path", None)
2901
      if src_node is None or src_path is None:
2902
        raise errors.OpPrereqError("Importing an instance requires source"
2903
                                   " node and path options")
2904
      src_node_full = self.cfg.ExpandNodeName(src_node)
2905
      if src_node_full is None:
2906
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2907
      self.op.src_node = src_node = src_node_full
2908

    
2909
      if not os.path.isabs(src_path):
2910
        raise errors.OpPrereqError("The source path must be absolute")
2911

    
2912
      export_info = rpc.call_export_info(src_node, src_path)
2913

    
2914
      if not export_info:
2915
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2916

    
2917
      if not export_info.has_section(constants.INISECT_EXP):
2918
        raise errors.ProgrammerError("Corrupted export config")
2919

    
2920
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2921
      if (int(ei_version) != constants.EXPORT_VERSION):
2922
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2923
                                   (ei_version, constants.EXPORT_VERSION))
2924

    
2925
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2926
        raise errors.OpPrereqError("Can't import instance with more than"
2927
                                   " one data disk")
2928

    
2929
      # FIXME: are the old os-es, disk sizes, etc. useful?
2930
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2931
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2932
                                                         'disk0_dump'))
2933
      self.src_image = diskimage
2934
    else: # INSTANCE_CREATE
2935
      if getattr(self.op, "os_type", None) is None:
2936
        raise errors.OpPrereqError("No guest OS specified")
2937

    
2938
    # check primary node
2939
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2940
    if pnode is None:
2941
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2942
                                 self.op.pnode)
2943
    self.op.pnode = pnode.name
2944
    self.pnode = pnode
2945
    self.secondaries = []
2946
    # disk template and mirror node verification
2947
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2948
      raise errors.OpPrereqError("Invalid disk template name")
2949

    
2950
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2951
      if getattr(self.op, "snode", None) is None:
2952
        raise errors.OpPrereqError("The networked disk templates need"
2953
                                   " a mirror node")
2954

    
2955
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2956
      if snode_name is None:
2957
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2958
                                   self.op.snode)
2959
      elif snode_name == pnode.name:
2960
        raise errors.OpPrereqError("The secondary node cannot be"
2961
                                   " the primary node.")
2962
      self.secondaries.append(snode_name)
2963

    
2964
    # Check lv size requirements
2965
    nodenames = [pnode.name] + self.secondaries
2966
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2967

    
2968
    # Required free disk space as a function of disk and swap space
2969
    req_size_dict = {
2970
      constants.DT_DISKLESS: 0,
2971
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2974
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2976
    }
2977

    
2978
    if self.op.disk_template not in req_size_dict:
2979
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2980
                                   " is unknown" %  self.op.disk_template)
2981

    
2982
    req_size = req_size_dict[self.op.disk_template]
2983

    
2984
    for node in nodenames:
2985
      info = nodeinfo.get(node, None)
2986
      if not info:
2987
        raise errors.OpPrereqError("Cannot get current information"
2988
                                   " from node '%s'" % nodeinfo)
2989
      if req_size > info['vg_free']:
2990
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2991
                                   " %d MB available, %d MB required" %
2992
                                   (node, info['vg_free'], req_size))
2993

    
2994
    # os verification
2995
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2996
    if not os_obj:
2997
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2998
                                 " primary node"  % self.op.os_type)
2999

    
3000
    if self.op.kernel_path == constants.VALUE_NONE:
3001
      raise errors.OpPrereqError("Can't set instance kernel to none")
3002

    
3003
    # instance verification
3004
    hostname1 = utils.HostInfo(self.op.instance_name)
3005

    
3006
    self.op.instance_name = instance_name = hostname1.name
3007
    instance_list = self.cfg.GetInstanceList()
3008
    if instance_name in instance_list:
3009
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3010
                                 instance_name)
3011

    
3012
    ip = getattr(self.op, "ip", None)
3013
    if ip is None or ip.lower() == "none":
3014
      inst_ip = None
3015
    elif ip.lower() == "auto":
3016
      inst_ip = hostname1.ip
3017
    else:
3018
      if not utils.IsValidIP(ip):
3019
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3020
                                   " like a valid IP" % ip)
3021
      inst_ip = ip
3022
    self.inst_ip = inst_ip
3023

    
3024
    if self.op.start and not self.op.ip_check:
3025
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3026
                                 " adding an instance in start mode")
3027

    
3028
    if self.op.ip_check:
3029
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3030
                       constants.DEFAULT_NODED_PORT):
3031
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3032
                                   (hostname1.ip, instance_name))
3033

    
3034
    # MAC address verification
3035
    if self.op.mac != "auto":
3036
      if not utils.IsValidMac(self.op.mac.lower()):
3037
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3038
                                   self.op.mac)
3039

    
3040
    # bridge verification
3041
    bridge = getattr(self.op, "bridge", None)
3042
    if bridge is None:
3043
      self.op.bridge = self.cfg.GetDefBridge()
3044
    else:
3045
      self.op.bridge = bridge
3046

    
3047
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3048
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3049
                                 " destination node '%s'" %
3050
                                 (self.op.bridge, pnode.name))
3051

    
3052
    if self.op.start:
3053
      self.instance_status = 'up'
3054
    else:
3055
      self.instance_status = 'down'
3056

    
3057
  def Exec(self, feedback_fn):
3058
    """Create and add the instance to the cluster.
3059

3060
    """
3061
    instance = self.op.instance_name
3062
    pnode_name = self.pnode.name
3063

    
3064
    if self.op.mac == "auto":
3065
      mac_address=self.cfg.GenerateMAC()
3066
    else:
3067
      mac_address=self.op.mac
3068

    
3069
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3070
    if self.inst_ip is not None:
3071
      nic.ip = self.inst_ip
3072

    
3073
    ht_kind = self.sstore.GetHypervisorType()
3074
    if ht_kind in constants.HTS_REQ_PORT:
3075
      network_port = self.cfg.AllocatePort()
3076
    else:
3077
      network_port = None
3078

    
3079
    disks = _GenerateDiskTemplate(self.cfg,
3080
                                  self.op.disk_template,
3081
                                  instance, pnode_name,
3082
                                  self.secondaries, self.op.disk_size,
3083
                                  self.op.swap_size)
3084

    
3085
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3086
                            primary_node=pnode_name,
3087
                            memory=self.op.mem_size,
3088
                            vcpus=self.op.vcpus,
3089
                            nics=[nic], disks=disks,
3090
                            disk_template=self.op.disk_template,
3091
                            status=self.instance_status,
3092
                            network_port=network_port,
3093
                            kernel_path=self.op.kernel_path,
3094
                            initrd_path=self.op.initrd_path,
3095
                            )
3096

    
3097
    feedback_fn("* creating instance disks...")
3098
    if not _CreateDisks(self.cfg, iobj):
3099
      _RemoveDisks(iobj, self.cfg)
3100
      raise errors.OpExecError("Device creation failed, reverting...")
3101

    
3102
    feedback_fn("adding instance %s to cluster config" % instance)
3103

    
3104
    self.cfg.AddInstance(iobj)
3105

    
3106
    if self.op.wait_for_sync:
3107
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3108
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3109
      # make sure the disks are not degraded (still sync-ing is ok)
3110
      time.sleep(15)
3111
      feedback_fn("* checking mirrors status")
3112
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3113
    else:
3114
      disk_abort = False
3115

    
3116
    if disk_abort:
3117
      _RemoveDisks(iobj, self.cfg)
3118
      self.cfg.RemoveInstance(iobj.name)
3119
      raise errors.OpExecError("There are some degraded disks for"
3120
                               " this instance")
3121

    
3122
    feedback_fn("creating os for instance %s on node %s" %
3123
                (instance, pnode_name))
3124

    
3125
    if iobj.disk_template != constants.DT_DISKLESS:
3126
      if self.op.mode == constants.INSTANCE_CREATE:
3127
        feedback_fn("* running the instance OS create scripts...")
3128
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3129
          raise errors.OpExecError("could not add os for instance %s"
3130
                                   " on node %s" %
3131
                                   (instance, pnode_name))
3132

    
3133
      elif self.op.mode == constants.INSTANCE_IMPORT:
3134
        feedback_fn("* running the instance OS import scripts...")
3135
        src_node = self.op.src_node
3136
        src_image = self.src_image
3137
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3138
                                                src_node, src_image):
3139
          raise errors.OpExecError("Could not import os for instance"
3140
                                   " %s on node %s" %
3141
                                   (instance, pnode_name))
3142
      else:
3143
        # also checked in the prereq part
3144
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3145
                                     % self.op.mode)
3146

    
3147
    if self.op.start:
3148
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3149
      feedback_fn("* starting instance...")
3150
      if not rpc.call_instance_start(pnode_name, iobj, None):
3151
        raise errors.OpExecError("Could not start instance")
3152

    
3153

    
3154
class LUConnectConsole(NoHooksLU):
3155
  """Connect to an instance's console.
3156

3157
  This is somewhat special in that it returns the command line that
3158
  you need to run on the master node in order to connect to the
3159
  console.
3160

3161
  """
3162
  _OP_REQP = ["instance_name"]
3163

    
3164
  def CheckPrereq(self):
3165
    """Check prerequisites.
3166

3167
    This checks that the instance is in the cluster.
3168

3169
    """
3170
    instance = self.cfg.GetInstanceInfo(
3171
      self.cfg.ExpandInstanceName(self.op.instance_name))
3172
    if instance is None:
3173
      raise errors.OpPrereqError("Instance '%s' not known" %
3174
                                 self.op.instance_name)
3175
    self.instance = instance
3176

    
3177
  def Exec(self, feedback_fn):
3178
    """Connect to the console of an instance
3179

3180
    """
3181
    instance = self.instance
3182
    node = instance.primary_node
3183

    
3184
    node_insts = rpc.call_instance_list([node])[node]
3185
    if node_insts is False:
3186
      raise errors.OpExecError("Can't connect to node %s." % node)
3187

    
3188
    if instance.name not in node_insts:
3189
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3190

    
3191
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3192

    
3193
    hyper = hypervisor.GetHypervisor()
3194
    console_cmd = hyper.GetShellCommandForConsole(instance)
3195
    # build ssh cmdline
3196
    argv = ["ssh", "-q", "-t"]
3197
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3198
    argv.extend(ssh.BATCH_MODE_OPTS)
3199
    argv.append(node)
3200
    argv.append(console_cmd)
3201
    return "ssh", argv
3202

    
3203

    
3204
class LUAddMDDRBDComponent(LogicalUnit):
3205
  """Adda new mirror member to an instance's disk.
3206

3207
  """
3208
  HPATH = "mirror-add"
3209
  HTYPE = constants.HTYPE_INSTANCE
3210
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3211

    
3212
  def BuildHooksEnv(self):
3213
    """Build hooks env.
3214

3215
    This runs on the master, the primary and all the secondaries.
3216

3217
    """
3218
    env = {
3219
      "NEW_SECONDARY": self.op.remote_node,
3220
      "DISK_NAME": self.op.disk_name,
3221
      }
3222
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3223
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3224
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3225
    return env, nl, nl
3226

    
3227
  def CheckPrereq(self):
3228
    """Check prerequisites.
3229

3230
    This checks that the instance is in the cluster.
3231

3232
    """
3233
    instance = self.cfg.GetInstanceInfo(
3234
      self.cfg.ExpandInstanceName(self.op.instance_name))
3235
    if instance is None:
3236
      raise errors.OpPrereqError("Instance '%s' not known" %
3237
                                 self.op.instance_name)
3238
    self.instance = instance
3239

    
3240
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3241
    if remote_node is None:
3242
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3243
    self.remote_node = remote_node
3244

    
3245
    if remote_node == instance.primary_node:
3246
      raise errors.OpPrereqError("The specified node is the primary node of"
3247
                                 " the instance.")
3248

    
3249
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3250
      raise errors.OpPrereqError("Instance's disk layout is not"
3251
                                 " remote_raid1.")
3252
    for disk in instance.disks:
3253
      if disk.iv_name == self.op.disk_name:
3254
        break
3255
    else:
3256
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3257
                                 " instance." % self.op.disk_name)
3258
    if len(disk.children) > 1:
3259
      raise errors.OpPrereqError("The device already has two slave devices."
3260
                                 " This would create a 3-disk raid1 which we"
3261
                                 " don't allow.")
3262
    self.disk = disk
3263

    
3264
  def Exec(self, feedback_fn):
3265
    """Add the mirror component
3266

3267
    """
3268
    disk = self.disk
3269
    instance = self.instance
3270

    
3271
    remote_node = self.remote_node
3272
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3273
    names = _GenerateUniqueNames(self.cfg, lv_names)
3274
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3275
                                     remote_node, disk.size, names)
3276

    
3277
    logger.Info("adding new mirror component on secondary")
3278
    #HARDCODE
3279
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3280
                                      new_drbd, False,
3281
                                      _GetInstanceInfoText(instance)):
3282
      raise errors.OpExecError("Failed to create new component on secondary"
3283
                               " node %s" % remote_node)
3284

    
3285
    logger.Info("adding new mirror component on primary")
3286
    #HARDCODE
3287
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3288
                                    instance, new_drbd,
3289
                                    _GetInstanceInfoText(instance)):
3290
      # remove secondary dev
3291
      self.cfg.SetDiskID(new_drbd, remote_node)
3292
      rpc.call_blockdev_remove(remote_node, new_drbd)
3293
      raise errors.OpExecError("Failed to create volume on primary")
3294

    
3295
    # the device exists now
3296
    # call the primary node to add the mirror to md
3297
    logger.Info("adding new mirror component to md")
3298
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3299
                                         disk, [new_drbd]):
3300
      logger.Error("Can't add mirror compoment to md!")
3301
      self.cfg.SetDiskID(new_drbd, remote_node)
3302
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3303
        logger.Error("Can't rollback on secondary")
3304
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3305
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3306
        logger.Error("Can't rollback on primary")
3307
      raise errors.OpExecError("Can't add mirror component to md array")
3308

    
3309
    disk.children.append(new_drbd)
3310

    
3311
    self.cfg.AddInstance(instance)
3312

    
3313
    _WaitForSync(self.cfg, instance, self.proc)
3314

    
3315
    return 0
3316

    
3317

    
3318
class LURemoveMDDRBDComponent(LogicalUnit):
3319
  """Remove a component from a remote_raid1 disk.
3320

3321
  """
3322
  HPATH = "mirror-remove"
3323
  HTYPE = constants.HTYPE_INSTANCE
3324
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3325

    
3326
  def BuildHooksEnv(self):
3327
    """Build hooks env.
3328

3329
    This runs on the master, the primary and all the secondaries.
3330

3331
    """
3332
    env = {
3333
      "DISK_NAME": self.op.disk_name,
3334
      "DISK_ID": self.op.disk_id,
3335
      "OLD_SECONDARY": self.old_secondary,
3336
      }
3337
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3338
    nl = [self.sstore.GetMasterNode(),
3339
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3340
    return env, nl, nl
3341

    
3342
  def CheckPrereq(self):
3343
    """Check prerequisites.
3344

3345
    This checks that the instance is in the cluster.
3346

3347
    """
3348
    instance = self.cfg.GetInstanceInfo(
3349
      self.cfg.ExpandInstanceName(self.op.instance_name))
3350
    if instance is None:
3351
      raise errors.OpPrereqError("Instance '%s' not known" %
3352
                                 self.op.instance_name)
3353
    self.instance = instance
3354

    
3355
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3356
      raise errors.OpPrereqError("Instance's disk layout is not"
3357
                                 " remote_raid1.")
3358
    for disk in instance.disks:
3359
      if disk.iv_name == self.op.disk_name:
3360
        break
3361
    else:
3362
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3363
                                 " instance." % self.op.disk_name)
3364
    for child in disk.children:
3365
      if (child.dev_type == constants.LD_DRBD7 and
3366
          child.logical_id[2] == self.op.disk_id):
3367
        break
3368
    else:
3369
      raise errors.OpPrereqError("Can't find the device with this port.")
3370

    
3371
    if len(disk.children) < 2:
3372
      raise errors.OpPrereqError("Cannot remove the last component from"
3373
                                 " a mirror.")
3374
    self.disk = disk
3375
    self.child = child
3376
    if self.child.logical_id[0] == instance.primary_node:
3377
      oid = 1
3378
    else:
3379
      oid = 0
3380
    self.old_secondary = self.child.logical_id[oid]
3381

    
3382
  def Exec(self, feedback_fn):
3383
    """Remove the mirror component
3384

3385
    """
3386
    instance = self.instance
3387
    disk = self.disk
3388
    child = self.child
3389
    logger.Info("remove mirror component")
3390
    self.cfg.SetDiskID(disk, instance.primary_node)
3391
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3392
                                            disk, [child]):
3393
      raise errors.OpExecError("Can't remove child from mirror.")
3394

    
3395
    for node in child.logical_id[:2]:
3396
      self.cfg.SetDiskID(child, node)
3397
      if not rpc.call_blockdev_remove(node, child):
3398
        logger.Error("Warning: failed to remove device from node %s,"
3399
                     " continuing operation." % node)
3400

    
3401
    disk.children.remove(child)
3402
    self.cfg.AddInstance(instance)
3403

    
3404

    
3405
class LUReplaceDisks(LogicalUnit):
3406
  """Replace the disks of an instance.
3407

3408
  """
3409
  HPATH = "mirrors-replace"
3410
  HTYPE = constants.HTYPE_INSTANCE
3411
  _OP_REQP = ["instance_name", "mode", "disks"]
3412

    
3413
  def BuildHooksEnv(self):
3414
    """Build hooks env.
3415

3416
    This runs on the master, the primary and all the secondaries.
3417

3418
    """
3419
    env = {
3420
      "MODE": self.op.mode,
3421
      "NEW_SECONDARY": self.op.remote_node,
3422
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3423
      }
3424
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3425
    nl = [
3426
      self.sstore.GetMasterNode(),
3427
      self.instance.primary_node,
3428
      ]
3429
    if self.op.remote_node is not None:
3430
      nl.append(self.op.remote_node)
3431
    return env, nl, nl
3432

    
3433
  def CheckPrereq(self):
3434
    """Check prerequisites.
3435

3436
    This checks that the instance is in the cluster.
3437

3438
    """
3439
    instance = self.cfg.GetInstanceInfo(
3440
      self.cfg.ExpandInstanceName(self.op.instance_name))
3441
    if instance is None:
3442
      raise errors.OpPrereqError("Instance '%s' not known" %
3443
                                 self.op.instance_name)
3444
    self.instance = instance
3445
    self.op.instance_name = instance.name
3446

    
3447
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3448
      raise errors.OpPrereqError("Instance's disk layout is not"
3449
                                 " network mirrored.")
3450

    
3451
    if len(instance.secondary_nodes) != 1:
3452
      raise errors.OpPrereqError("The instance has a strange layout,"
3453
                                 " expected one secondary but found %d" %
3454
                                 len(instance.secondary_nodes))
3455

    
3456
    self.sec_node = instance.secondary_nodes[0]
3457

    
3458
    remote_node = getattr(self.op, "remote_node", None)
3459
    if remote_node is not None:
3460
      remote_node = self.cfg.ExpandNodeName(remote_node)
3461
      if remote_node is None:
3462
        raise errors.OpPrereqError("Node '%s' not known" %
3463
                                   self.op.remote_node)
3464
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3465
    else:
3466
      self.remote_node_info = None
3467
    if remote_node == instance.primary_node:
3468
      raise errors.OpPrereqError("The specified node is the primary node of"
3469
                                 " the instance.")
3470
    elif remote_node == self.sec_node:
3471
      if self.op.mode == constants.REPLACE_DISK_SEC:
3472
        # this is for DRBD8, where we can't execute the same mode of
3473
        # replacement as for drbd7 (no different port allocated)
3474
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3475
                                   " replacement")
3476
      # the user gave the current secondary, switch to
3477
      # 'no-replace-secondary' mode for drbd7
3478
      remote_node = None
3479
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3480
        self.op.mode != constants.REPLACE_DISK_ALL):
3481
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3482
                                 " disks replacement, not individual ones")
3483
    if instance.disk_template == constants.DT_DRBD8:
3484
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3485
          remote_node is not None):
3486
        # switch to replace secondary mode
3487
        self.op.mode = constants.REPLACE_DISK_SEC
3488

    
3489
      if self.op.mode == constants.REPLACE_DISK_ALL:
3490
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3491
                                   " secondary disk replacement, not"
3492
                                   " both at once")
3493
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3494
        if remote_node is not None:
3495
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3496
                                     " the secondary while doing a primary"
3497
                                     " node disk replacement")
3498
        self.tgt_node = instance.primary_node
3499
        self.oth_node = instance.secondary_nodes[0]
3500
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3501
        self.new_node = remote_node # this can be None, in which case
3502
                                    # we don't change the secondary
3503
        self.tgt_node = instance.secondary_nodes[0]
3504
        self.oth_node = instance.primary_node
3505
      else:
3506
        raise errors.ProgrammerError("Unhandled disk replace mode")
3507

    
3508
    for name in self.op.disks:
3509
      if instance.FindDisk(name) is None:
3510
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3511
                                   (name, instance.name))
3512
    self.op.remote_node = remote_node
3513

    
3514
  def _ExecRR1(self, feedback_fn):
3515
    """Replace the disks of an instance.
3516

3517
    """
3518
    instance = self.instance
3519
    iv_names = {}
3520
    # start of work
3521
    if self.op.remote_node is None:
3522
      remote_node = self.sec_node
3523
    else:
3524
      remote_node = self.op.remote_node
3525
    cfg = self.cfg
3526
    for dev in instance.disks:
3527
      size = dev.size
3528
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3529
      names = _GenerateUniqueNames(cfg, lv_names)
3530
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3531
                                       remote_node, size, names)
3532
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3533
      logger.Info("adding new mirror component on secondary for %s" %
3534
                  dev.iv_name)
3535
      #HARDCODE
3536
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3537
                                        new_drbd, False,
3538
                                        _GetInstanceInfoText(instance)):
3539
        raise errors.OpExecError("Failed to create new component on secondary"
3540
                                 " node %s. Full abort, cleanup manually!" %
3541
                                 remote_node)
3542

    
3543
      logger.Info("adding new mirror component on primary")
3544
      #HARDCODE
3545
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3546
                                      instance, new_drbd,
3547
                                      _GetInstanceInfoText(instance)):
3548
        # remove secondary dev
3549
        cfg.SetDiskID(new_drbd, remote_node)
3550
        rpc.call_blockdev_remove(remote_node, new_drbd)
3551
        raise errors.OpExecError("Failed to create volume on primary!"
3552
                                 " Full abort, cleanup manually!!")
3553

    
3554
      # the device exists now
3555
      # call the primary node to add the mirror to md
3556
      logger.Info("adding new mirror component to md")
3557
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3558
                                           [new_drbd]):
3559
        logger.Error("Can't add mirror compoment to md!")
3560
        cfg.SetDiskID(new_drbd, remote_node)
3561
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3562
          logger.Error("Can't rollback on secondary")
3563
        cfg.SetDiskID(new_drbd, instance.primary_node)
3564
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3565
          logger.Error("Can't rollback on primary")
3566
        raise errors.OpExecError("Full abort, cleanup manually!!")
3567

    
3568
      dev.children.append(new_drbd)
3569
      cfg.AddInstance(instance)
3570

    
3571
    # this can fail as the old devices are degraded and _WaitForSync
3572
    # does a combined result over all disks, so we don't check its
3573
    # return value
3574
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3575

    
3576
    # so check manually all the devices
3577
    for name in iv_names:
3578
      dev, child, new_drbd = iv_names[name]
3579
      cfg.SetDiskID(dev, instance.primary_node)
3580
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3581
      if is_degr:
3582
        raise errors.OpExecError("MD device %s is degraded!" % name)
3583
      cfg.SetDiskID(new_drbd, instance.primary_node)
3584
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3585
      if is_degr:
3586
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3587

    
3588
    for name in iv_names:
3589
      dev, child, new_drbd = iv_names[name]
3590
      logger.Info("remove mirror %s component" % name)
3591
      cfg.SetDiskID(dev, instance.primary_node)
3592
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3593
                                              dev, [child]):
3594
        logger.Error("Can't remove child from mirror, aborting"
3595
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3596
        continue
3597

    
3598
      for node in child.logical_id[:2]:
3599
        logger.Info("remove child device on %s" % node)
3600
        cfg.SetDiskID(child, node)
3601
        if not rpc.call_blockdev_remove(node, child):
3602
          logger.Error("Warning: failed to remove device from node %s,"
3603
                       " continuing operation." % node)
3604

    
3605
      dev.children.remove(child)
3606

    
3607
      cfg.AddInstance(instance)
3608

    
3609
  def _ExecD8DiskOnly(self, feedback_fn):
3610
    """Replace a disk on the primary or secondary for dbrd8.
3611

3612
    The algorithm for replace is quite complicated:
3613
      - for each disk to be replaced:
3614
        - create new LVs on the target node with unique names
3615
        - detach old LVs from the drbd device
3616
        - rename old LVs to name_replaced.<time_t>
3617
        - rename new LVs to old LVs
3618
        - attach the new LVs (with the old names now) to the drbd device
3619
      - wait for sync across all devices
3620
      - for each modified disk:
3621
        - remove old LVs (which have the name name_replaces.<time_t>)
3622

3623
    Failures are not very well handled.
3624

3625
    """
3626
    steps_total = 6
3627
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3628
    instance = self.instance
3629
    iv_names = {}
3630
    vgname = self.cfg.GetVGName()
3631
    # start of work
3632
    cfg = self.cfg
3633
    tgt_node = self.tgt_node
3634
    oth_node = self.oth_node
3635

    
3636
    # Step: check device activation
3637
    self.proc.LogStep(1, steps_total, "check device existence")
3638
    info("checking volume groups")
3639
    my_vg = cfg.GetVGName()
3640
    results = rpc.call_vg_list([oth_node, tgt_node])
3641
    if not results:
3642
      raise errors.OpExecError("Can't list volume groups on the nodes")
3643
    for node in oth_node, tgt_node:
3644
      res = results.get(node, False)
3645
      if not res or my_vg not in res:
3646
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3647
                                 (my_vg, node))
3648
    for dev in instance.disks:
3649
      if not dev.iv_name in self.op.disks:
3650
        continue
3651
      for node in tgt_node, oth_node:
3652
        info("checking %s on %s" % (dev.iv_name, node))
3653
        cfg.SetDiskID(dev, node)
3654
        if not rpc.call_blockdev_find(node, dev):
3655
          raise errors.OpExecError("Can't find device %s on node %s" %
3656
                                   (dev.iv_name, node))
3657

    
3658
    # Step: check other node consistency
3659
    self.proc.LogStep(2, steps_total, "check peer consistency")
3660
    for dev in instance.disks:
3661
      if not dev.iv_name in self.op.disks:
3662
        continue
3663
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3664
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3665
                                   oth_node==instance.primary_node):
3666
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3667
                                 " to replace disks on this node (%s)" %
3668
                                 (oth_node, tgt_node))
3669

    
3670
    # Step: create new storage
3671
    self.proc.LogStep(3, steps_total, "allocate new storage")
3672
    for dev in instance.disks:
3673
      if not dev.iv_name in self.op.disks:
3674
        continue
3675
      size = dev.size
3676
      cfg.SetDiskID(dev, tgt_node)
3677
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3678
      names = _GenerateUniqueNames(cfg, lv_names)
3679
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3680
                             logical_id=(vgname, names[0]))
3681
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3682
                             logical_id=(vgname, names[1]))
3683
      new_lvs = [lv_data, lv_meta]
3684
      old_lvs = dev.children
3685
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3686
      info("creating new local storage on %s for %s" %
3687
           (tgt_node, dev.iv_name))
3688
      # since we *always* want to create this LV, we use the
3689
      # _Create...OnPrimary (which forces the creation), even if we
3690
      # are talking about the secondary node
3691
      for new_lv in new_lvs:
3692
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3693
                                        _GetInstanceInfoText(instance)):
3694
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3695
                                   " node '%s'" %
3696
                                   (new_lv.logical_id[1], tgt_node))
3697

    
3698
    # Step: for each lv, detach+rename*2+attach
3699
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3700
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3701
      info("detaching %s drbd from local storage" % dev.iv_name)
3702
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3703
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3704
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3705
      #dev.children = []
3706
      #cfg.Update(instance)
3707

    
3708
      # ok, we created the new LVs, so now we know we have the needed
3709
      # storage; as such, we proceed on the target node to rename
3710
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3711
      # using the assumption than logical_id == physical_id (which in
3712
      # turn is the unique_id on that node)
3713

    
3714
      # FIXME(iustin): use a better name for the replaced LVs
3715
      temp_suffix = int(time.time())
3716
      ren_fn = lambda d, suff: (d.physical_id[0],
3717
                                d.physical_id[1] + "_replaced-%s" % suff)
3718
      # build the rename list based on what LVs exist on the node
3719
      rlist = []
3720
      for to_ren in old_lvs:
3721
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3722
        if find_res is not None: # device exists
3723
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3724

    
3725
      info("renaming the old LVs on the target node")
3726
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3727
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3728
      # now we rename the new LVs to the old LVs
3729
      info("renaming the new LVs on the target node")
3730
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3731
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3732
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3733

    
3734
      for old, new in zip(old_lvs, new_lvs):
3735
        new.logical_id = old.logical_id
3736
        cfg.SetDiskID(new, tgt_node)
3737

    
3738
      for disk in old_lvs:
3739
        disk.logical_id = ren_fn(disk, temp_suffix)
3740
        cfg.SetDiskID(disk, tgt_node)
3741

    
3742
      # now that the new lvs have the old name, we can add them to the device
3743
      info("adding new mirror component on %s" % tgt_node)
3744
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3745
        for new_lv in new_lvs:
3746
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3747
            warning("Can't rollback device %s", hint="manually cleanup unused"
3748
                    " logical volumes")
3749
        raise errors.OpExecError("Can't add local storage to drbd")
3750

    
3751
      dev.children = new_lvs
3752
      cfg.Update(instance)
3753

    
3754
    # Step: wait for sync
3755

    
3756
    # this can fail as the old devices are degraded and _WaitForSync
3757
    # does a combined result over all disks, so we don't check its
3758
    # return value
3759
    self.proc.LogStep(5, steps_total, "sync devices")
3760
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3761

    
3762
    # so check manually all the devices
3763
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3764
      cfg.SetDiskID(dev, instance.primary_node)
3765
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3766
      if is_degr:
3767
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3768

    
3769
    # Step: remove old storage
3770
    self.proc.LogStep(6, steps_total, "removing old storage")
3771
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3772
      info("remove logical volumes for %s" % name)
3773
      for lv in old_lvs:
3774
        cfg.SetDiskID(lv, tgt_node)
3775
        if not rpc.call_blockdev_remove(tgt_node, lv):
3776
          warning("Can't remove old LV", hint="manually remove unused LVs")
3777
          continue
3778

    
3779
  def _ExecD8Secondary(self, feedback_fn):
3780
    """Replace the secondary node for drbd8.
3781

3782
    The algorithm for replace is quite complicated:
3783
      - for all disks of the instance:
3784
        - create new LVs on the new node with same names
3785
        - shutdown the drbd device on the old secondary
3786
        - disconnect the drbd network on the primary
3787
        - create the drbd device on the new secondary
3788
        - network attach the drbd on the primary, using an artifice:
3789
          the drbd code for Attach() will connect to the network if it
3790
          finds a device which is connected to the good local disks but
3791
          not network enabled
3792
      - wait for sync across all devices
3793
      - remove all disks from the old secondary
3794

3795
    Failures are not very well handled.
3796

3797
    """
3798
    steps_total = 6
3799
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3800
    instance = self.instance
3801
    iv_names = {}
3802
    vgname = self.cfg.GetVGName()
3803
    # start of work
3804
    cfg = self.cfg
3805
    old_node = self.tgt_node
3806
    new_node = self.new_node
3807
    pri_node = instance.primary_node
3808

    
3809
    # Step: check device activation
3810
    self.proc.LogStep(1, steps_total, "check device existence")
3811
    info("checking volume groups")
3812
    my_vg = cfg.GetVGName()
3813
    results = rpc.call_vg_list([pri_node, new_node])
3814
    if not results:
3815
      raise errors.OpExecError("Can't list volume groups on the nodes")
3816
    for node in pri_node, new_node:
3817
      res = results.get(node, False)
3818
      if not res or my_vg not in res:
3819
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3820
                                 (my_vg, node))
3821
    for dev in instance.disks:
3822
      if not dev.iv_name in self.op.disks:
3823
        continue
3824
      info("checking %s on %s" % (dev.iv_name, pri_node))
3825
      cfg.SetDiskID(dev, pri_node)
3826
      if not rpc.call_blockdev_find(pri_node, dev):
3827
        raise errors.OpExecError("Can't find device %s on node %s" %
3828
                                 (dev.iv_name, pri_node))
3829

    
3830
    # Step: check other node consistency
3831
    self.proc.LogStep(2, steps_total, "check peer consistency")
3832
    for dev in instance.disks:
3833
      if not dev.iv_name in self.op.disks:
3834
        continue
3835
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3836
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3837
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3838
                                 " unsafe to replace the secondary" %
3839
                                 pri_node)
3840

    
3841
    # Step: create new storage
3842
    self.proc.LogStep(3, steps_total, "allocate new storage")
3843
    for dev in instance.disks:
3844
      size = dev.size
3845
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3846
      # since we *always* want to create this LV, we use the
3847
      # _Create...OnPrimary (which forces the creation), even if we
3848
      # are talking about the secondary node
3849
      for new_lv in dev.children:
3850
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3851
                                        _GetInstanceInfoText(instance)):
3852
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3853
                                   " node '%s'" %
3854
                                   (new_lv.logical_id[1], new_node))
3855

    
3856
      iv_names[dev.iv_name] = (dev, dev.children)
3857

    
3858
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3859
    for dev in instance.disks:
3860
      size = dev.size
3861
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3862
      # create new devices on new_node
3863
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3864
                              logical_id=(pri_node, new_node,
3865
                                          dev.logical_id[2]),
3866
                              children=dev.children)
3867
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3868
                                        new_drbd, False,
3869
                                      _GetInstanceInfoText(instance)):
3870
        raise errors.OpExecError("Failed to create new DRBD on"
3871
                                 " node '%s'" % new_node)
3872

    
3873
    for dev in instance.disks:
3874
      # we have new devices, shutdown the drbd on the old secondary
3875
      info("shutting down drbd for %s on old node" % dev.iv_name)
3876
      cfg.SetDiskID(dev, old_node)
3877
      if not rpc.call_blockdev_shutdown(old_node, dev):
3878
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3879
                hint="Please cleanup this device manually as soon as possible")
3880

    
3881
    info("detaching primary drbds from the network (=> standalone)")
3882
    done = 0
3883
    for dev in instance.disks:
3884
      cfg.SetDiskID(dev, pri_node)
3885
      # set the physical (unique in bdev terms) id to None, meaning
3886
      # detach from network
3887
      dev.physical_id = (None,) * len(dev.physical_id)
3888
      # and 'find' the device, which will 'fix' it to match the
3889
      # standalone state
3890
      if rpc.call_blockdev_find(pri_node, dev):
3891
        done += 1
3892
      else:
3893
        warning("Failed to detach drbd %s from network, unusual case" %
3894
                dev.iv_name)
3895

    
3896
    if not done:
3897
      # no detaches succeeded (very unlikely)
3898
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3899

    
3900
    # if we managed to detach at least one, we update all the disks of
3901
    # the instance to point to the new secondary
3902
    info("updating instance configuration")
3903
    for dev in instance.disks:
3904
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3905
      cfg.SetDiskID(dev, pri_node)
3906
    cfg.Update(instance)
3907

    
3908
    # and now perform the drbd attach
3909
    info("attaching primary drbds to new secondary (standalone => connected)")
3910
    failures = []
3911
    for dev in instance.disks:
3912
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3913
      # since the attach is smart, it's enough to 'find' the device,
3914
      # it will automatically activate the network, if the physical_id
3915
      # is correct
3916
      cfg.SetDiskID(dev, pri_node)
3917
      if not rpc.call_blockdev_find(pri_node, dev):
3918
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3919
                "please do a gnt-instance info to see the status of disks")
3920

    
3921
    # this can fail as the old devices are degraded and _WaitForSync
3922
    # does a combined result over all disks, so we don't check its
3923
    # return value
3924
    self.proc.LogStep(5, steps_total, "sync devices")
3925
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3926

    
3927
    # so check manually all the devices
3928
    for name, (dev, old_lvs) in iv_names.iteritems():
3929
      cfg.SetDiskID(dev, pri_node)
3930
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3931
      if is_degr:
3932
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3933

    
3934
    self.proc.LogStep(6, steps_total, "removing old storage")
3935
    for name, (dev, old_lvs) in iv_names.iteritems():
3936
      info("remove logical volumes for %s" % name)
3937
      for lv in old_lvs:
3938
        cfg.SetDiskID(lv, old_node)
3939
        if not rpc.call_blockdev_remove(old_node, lv):
3940
          warning("Can't remove LV on old secondary",
3941
                  hint="Cleanup stale volumes by hand")
3942

    
3943
  def Exec(self, feedback_fn):
3944
    """Execute disk replacement.
3945

3946
    This dispatches the disk replacement to the appropriate handler.
3947

3948
    """
3949
    instance = self.instance
3950
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3951
      fn = self._ExecRR1
3952
    elif instance.disk_template == constants.DT_DRBD8:
3953
      if self.op.remote_node is None:
3954
        fn = self._ExecD8DiskOnly
3955
      else:
3956
        fn = self._ExecD8Secondary
3957
    else:
3958
      raise errors.ProgrammerError("Unhandled disk replacement case")
3959
    return fn(feedback_fn)
3960

    
3961

    
3962
class LUQueryInstanceData(NoHooksLU):
3963
  """Query runtime instance data.
3964

3965
  """
3966
  _OP_REQP = ["instances"]
3967

    
3968
  def CheckPrereq(self):
3969
    """Check prerequisites.
3970

3971
    This only checks the optional instance list against the existing names.
3972

3973
    """
3974
    if not isinstance(self.op.instances, list):
3975
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3976
    if self.op.instances:
3977
      self.wanted_instances = []
3978
      names = self.op.instances
3979
      for name in names:
3980
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3981
        if instance is None:
3982
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3983
      self.wanted_instances.append(instance)
3984
    else:
3985
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3986
                               in self.cfg.GetInstanceList()]
3987
    return
3988

    
3989

    
3990
  def _ComputeDiskStatus(self, instance, snode, dev):
3991
    """Compute block device status.
3992

3993
    """
3994
    self.cfg.SetDiskID(dev, instance.primary_node)
3995
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3996
    if dev.dev_type in constants.LDS_DRBD:
3997
      # we change the snode then (otherwise we use the one passed in)
3998
      if dev.logical_id[0] == instance.primary_node:
3999
        snode = dev.logical_id[1]
4000
      else:
4001
        snode = dev.logical_id[0]
4002

    
4003
    if snode:
4004
      self.cfg.SetDiskID(dev, snode)
4005
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4006
    else:
4007
      dev_sstatus = None
4008

    
4009
    if dev.children:
4010
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4011
                      for child in dev.children]
4012
    else:
4013
      dev_children = []
4014

    
4015
    data = {
4016
      "iv_name": dev.iv_name,
4017
      "dev_type": dev.dev_type,
4018
      "logical_id": dev.logical_id,
4019
      "physical_id": dev.physical_id,
4020
      "pstatus": dev_pstatus,
4021
      "sstatus": dev_sstatus,
4022
      "children": dev_children,
4023
      }
4024

    
4025
    return data
4026

    
4027
  def Exec(self, feedback_fn):
4028
    """Gather and return data"""
4029
    result = {}
4030
    for instance in self.wanted_instances:
4031
      remote_info = rpc.call_instance_info(instance.primary_node,
4032
                                                instance.name)
4033
      if remote_info and "state" in remote_info:
4034
        remote_state = "up"
4035
      else:
4036
        remote_state = "down"
4037
      if instance.status == "down":
4038
        config_state = "down"
4039
      else:
4040
        config_state = "up"
4041

    
4042
      disks = [self._ComputeDiskStatus(instance, None, device)
4043
               for device in instance.disks]
4044

    
4045
      idict = {
4046
        "name": instance.name,
4047
        "config_state": config_state,
4048
        "run_state": remote_state,
4049
        "pnode": instance.primary_node,
4050
        "snodes": instance.secondary_nodes,
4051
        "os": instance.os,
4052
        "memory": instance.memory,
4053
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4054
        "disks": disks,
4055
        "network_port": instance.network_port,
4056
        "vcpus": instance.vcpus,
4057
        "kernel_path": instance.kernel_path,
4058
        "initrd_path": instance.initrd_path,
4059
        }
4060

    
4061
      result[instance.name] = idict
4062

    
4063
    return result
4064

    
4065

    
4066
class LUSetInstanceParms(LogicalUnit):
4067
  """Modifies an instances's parameters.
4068

4069
  """
4070
  HPATH = "instance-modify"
4071
  HTYPE = constants.HTYPE_INSTANCE
4072
  _OP_REQP = ["instance_name"]
4073

    
4074
  def BuildHooksEnv(self):
4075
    """Build hooks env.
4076

4077
    This runs on the master, primary and secondaries.
4078

4079
    """
4080
    args = dict()
4081
    if self.mem:
4082
      args['memory'] = self.mem
4083
    if self.vcpus:
4084
      args['vcpus'] = self.vcpus
4085
    if self.do_ip or self.do_bridge:
4086
      if self.do_ip:
4087
        ip = self.ip
4088
      else:
4089
        ip = self.instance.nics[0].ip
4090
      if self.bridge:
4091
        bridge = self.bridge
4092
      else:
4093
        bridge = self.instance.nics[0].bridge
4094
      args['nics'] = [(ip, bridge)]
4095
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4096
    nl = [self.sstore.GetMasterNode(),
4097
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4098
    return env, nl, nl
4099

    
4100
  def CheckPrereq(self):
4101
    """Check prerequisites.
4102

4103
    This only checks the instance list against the existing names.
4104

4105
    """
4106
    self.mem = getattr(self.op, "mem", None)
4107
    self.vcpus = getattr(self.op, "vcpus", None)
4108
    self.ip = getattr(self.op, "ip", None)
4109
    self.mac = getattr(self.op, "mac", None)
4110
    self.bridge = getattr(self.op, "bridge", None)
4111
    self.kernel_path = getattr(self.op, "kernel_path", None)
4112
    self.initrd_path = getattr(self.op, "initrd_path", None)
4113
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4114
                 self.kernel_path, self.initrd_path]
4115
    if all_parms.count(None) == len(all_parms):
4116
      raise errors.OpPrereqError("No changes submitted")
4117
    if self.mem is not None:
4118
      try:
4119
        self.mem = int(self.mem)
4120
      except ValueError, err:
4121
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4122
    if self.vcpus is not None:
4123
      try:
4124
        self.vcpus = int(self.vcpus)
4125
      except ValueError, err:
4126
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4127
    if self.ip is not None:
4128
      self.do_ip = True
4129
      if self.ip.lower() == "none":
4130
        self.ip = None
4131
      else:
4132
        if not utils.IsValidIP(self.ip):
4133
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4134
    else:
4135
      self.do_ip = False
4136
    self.do_bridge = (self.bridge is not None)
4137
    if self.mac is not None:
4138
      if self.cfg.IsMacInUse(self.mac):
4139
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4140
                                   self.mac)
4141
      if not utils.IsValidMac(self.mac):
4142
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4143

    
4144
    if self.kernel_path is not None:
4145
      self.do_kernel_path = True
4146
      if self.kernel_path == constants.VALUE_NONE:
4147
        raise errors.OpPrereqError("Can't set instance to no kernel")
4148

    
4149
      if self.kernel_path != constants.VALUE_DEFAULT:
4150
        if not os.path.isabs(self.kernel_path):
4151
          raise errors.OpPrereError("The kernel path must be an absolute"
4152
                                    " filename")
4153

    
4154
    if self.initrd_path is not None:
4155
      self.do_initrd_path = True
4156
      if self.initrd_path not in (constants.VALUE_NONE,
4157
                                  constants.VALUE_DEFAULT):
4158
        if not os.path.isabs(self.kernel_path):
4159
          raise errors.OpPrereError("The initrd path must be an absolute"
4160
                                    " filename")
4161

    
4162
    instance = self.cfg.GetInstanceInfo(
4163
      self.cfg.ExpandInstanceName(self.op.instance_name))
4164
    if instance is None:
4165
      raise errors.OpPrereqError("No such instance name '%s'" %
4166
                                 self.op.instance_name)
4167
    self.op.instance_name = instance.name
4168
    self.instance = instance
4169
    return
4170

    
4171
  def Exec(self, feedback_fn):
4172
    """Modifies an instance.
4173

4174
    All parameters take effect only at the next restart of the instance.
4175
    """
4176
    result = []
4177
    instance = self.instance
4178
    if self.mem:
4179
      instance.memory = self.mem
4180
      result.append(("mem", self.mem))
4181
    if self.vcpus:
4182
      instance.vcpus = self.vcpus
4183
      result.append(("vcpus",  self.vcpus))
4184
    if self.do_ip:
4185
      instance.nics[0].ip = self.ip
4186
      result.append(("ip", self.ip))
4187
    if self.bridge:
4188
      instance.nics[0].bridge = self.bridge
4189
      result.append(("bridge", self.bridge))
4190
    if self.mac:
4191
      instance.nics[0].mac = self.mac
4192
      result.append(("mac", self.mac))
4193
    if self.do_kernel_path:
4194
      instance.kernel_path = self.kernel_path
4195
      result.append(("kernel_path", self.kernel_path))
4196
    if self.do_initrd_path:
4197
      instance.initrd_path = self.initrd_path
4198
      result.append(("initrd_path", self.initrd_path))
4199

    
4200
    self.cfg.AddInstance(instance)
4201

    
4202
    return result
4203

    
4204

    
4205
class LUQueryExports(NoHooksLU):
4206
  """Query the exports list
4207

4208
  """
4209
  _OP_REQP = []
4210

    
4211
  def CheckPrereq(self):
4212
    """Check that the nodelist contains only existing nodes.
4213

4214
    """
4215
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4216

    
4217
  def Exec(self, feedback_fn):
4218
    """Compute the list of all the exported system images.
4219

4220
    Returns:
4221
      a dictionary with the structure node->(export-list)
4222
      where export-list is a list of the instances exported on
4223
      that node.
4224

4225
    """
4226
    return rpc.call_export_list(self.nodes)
4227

    
4228

    
4229
class LUExportInstance(LogicalUnit):
4230
  """Export an instance to an image in the cluster.
4231

4232
  """
4233
  HPATH = "instance-export"
4234
  HTYPE = constants.HTYPE_INSTANCE
4235
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4236

    
4237
  def BuildHooksEnv(self):
4238
    """Build hooks env.
4239

4240
    This will run on the master, primary node and target node.
4241

4242
    """
4243
    env = {
4244
      "EXPORT_NODE": self.op.target_node,
4245
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4246
      }
4247
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4248
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4249
          self.op.target_node]
4250
    return env, nl, nl
4251

    
4252
  def CheckPrereq(self):
4253
    """Check prerequisites.
4254

4255
    This checks that the instance name is a valid one.
4256

4257
    """
4258
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4259
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4260
    if self.instance is None:
4261
      raise errors.OpPrereqError("Instance '%s' not found" %
4262
                                 self.op.instance_name)
4263

    
4264
    # node verification
4265
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4266
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4267

    
4268
    if self.dst_node is None:
4269
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4270
                                 self.op.target_node)
4271
    self.op.target_node = self.dst_node.name
4272

    
4273
  def Exec(self, feedback_fn):
4274
    """Export an instance to an image in the cluster.
4275

4276
    """
4277
    instance = self.instance
4278
    dst_node = self.dst_node
4279
    src_node = instance.primary_node
4280
    # shutdown the instance, unless requested not to do so
4281
    if self.op.shutdown:
4282
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4283
      self.proc.ChainOpCode(op)
4284

    
4285
    vgname = self.cfg.GetVGName()
4286

    
4287
    snap_disks = []
4288

    
4289
    try:
4290
      for disk in instance.disks:
4291
        if disk.iv_name == "sda":
4292
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4293
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4294

    
4295
          if not new_dev_name:
4296
            logger.Error("could not snapshot block device %s on node %s" %
4297
                         (disk.logical_id[1], src_node))
4298
          else:
4299
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4300
                                      logical_id=(vgname, new_dev_name),
4301
                                      physical_id=(vgname, new_dev_name),
4302
                                      iv_name=disk.iv_name)
4303
            snap_disks.append(new_dev)
4304

    
4305
    finally:
4306
      if self.op.shutdown:
4307
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4308
                                       force=False)
4309
        self.proc.ChainOpCode(op)
4310

    
4311
    # TODO: check for size
4312

    
4313
    for dev in snap_disks:
4314
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4315
                                           instance):
4316
        logger.Error("could not export block device %s from node"
4317
                     " %s to node %s" %
4318
                     (dev.logical_id[1], src_node, dst_node.name))
4319
      if not rpc.call_blockdev_remove(src_node, dev):
4320
        logger.Error("could not remove snapshot block device %s from"
4321
                     " node %s" % (dev.logical_id[1], src_node))
4322

    
4323
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4324
      logger.Error("could not finalize export for instance %s on node %s" %
4325
                   (instance.name, dst_node.name))
4326

    
4327
    nodelist = self.cfg.GetNodeList()
4328
    nodelist.remove(dst_node.name)
4329

    
4330
    # on one-node clusters nodelist will be empty after the removal
4331
    # if we proceed the backup would be removed because OpQueryExports
4332
    # substitutes an empty list with the full cluster node list.
4333
    if nodelist:
4334
      op = opcodes.OpQueryExports(nodes=nodelist)
4335
      exportlist = self.proc.ChainOpCode(op)
4336
      for node in exportlist:
4337
        if instance.name in exportlist[node]:
4338
          if not rpc.call_export_remove(node, instance.name):
4339
            logger.Error("could not remove older export for instance %s"
4340
                         " on node %s" % (instance.name, node))
4341

    
4342

    
4343
class TagsLU(NoHooksLU):
4344
  """Generic tags LU.
4345

4346
  This is an abstract class which is the parent of all the other tags LUs.
4347

4348
  """
4349
  def CheckPrereq(self):
4350
    """Check prerequisites.
4351

4352
    """
4353
    if self.op.kind == constants.TAG_CLUSTER:
4354
      self.target = self.cfg.GetClusterInfo()
4355
    elif self.op.kind == constants.TAG_NODE:
4356
      name = self.cfg.ExpandNodeName(self.op.name)
4357
      if name is None:
4358
        raise errors.OpPrereqError("Invalid node name (%s)" %
4359
                                   (self.op.name,))
4360
      self.op.name = name
4361
      self.target = self.cfg.GetNodeInfo(name)
4362
    elif self.op.kind == constants.TAG_INSTANCE:
4363
      name = self.cfg.ExpandInstanceName(self.op.name)
4364
      if name is None:
4365
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4366
                                   (self.op.name,))
4367
      self.op.name = name
4368
      self.target = self.cfg.GetInstanceInfo(name)
4369
    else:
4370
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4371
                                 str(self.op.kind))
4372

    
4373

    
4374
class LUGetTags(TagsLU):
4375
  """Returns the tags of a given object.
4376

4377
  """
4378
  _OP_REQP = ["kind", "name"]
4379

    
4380
  def Exec(self, feedback_fn):
4381
    """Returns the tag list.
4382

4383
    """
4384
    return self.target.GetTags()
4385

    
4386

    
4387
class LUSearchTags(NoHooksLU):
4388
  """Searches the tags for a given pattern.
4389

4390
  """
4391
  _OP_REQP = ["pattern"]
4392

    
4393
  def CheckPrereq(self):
4394
    """Check prerequisites.
4395

4396
    This checks the pattern passed for validity by compiling it.
4397

4398
    """
4399
    try:
4400
      self.re = re.compile(self.op.pattern)
4401
    except re.error, err:
4402
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4403
                                 (self.op.pattern, err))
4404

    
4405
  def Exec(self, feedback_fn):
4406
    """Returns the tag list.
4407

4408
    """
4409
    cfg = self.cfg
4410
    tgts = [("/cluster", cfg.GetClusterInfo())]
4411
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4412
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4413
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4414
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4415
    results = []
4416
    for path, target in tgts:
4417
      for tag in target.GetTags():
4418
        if self.re.search(tag):
4419
          results.append((path, tag))
4420
    return results
4421

    
4422

    
4423
class LUAddTags(TagsLU):
4424
  """Sets a tag on a given object.
4425

4426
  """
4427
  _OP_REQP = ["kind", "name", "tags"]
4428

    
4429
  def CheckPrereq(self):
4430
    """Check prerequisites.
4431

4432
    This checks the type and length of the tag name and value.
4433

4434
    """
4435
    TagsLU.CheckPrereq(self)
4436
    for tag in self.op.tags:
4437
      objects.TaggableObject.ValidateTag(tag)
4438

    
4439
  def Exec(self, feedback_fn):
4440
    """Sets the tag.
4441

4442
    """
4443
    try:
4444
      for tag in self.op.tags:
4445
        self.target.AddTag(tag)
4446
    except errors.TagError, err:
4447
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4448
    try:
4449
      self.cfg.Update(self.target)
4450
    except errors.ConfigurationError:
4451
      raise errors.OpRetryError("There has been a modification to the"
4452
                                " config file and the operation has been"
4453
                                " aborted. Please retry.")
4454

    
4455

    
4456
class LUDelTags(TagsLU):
4457
  """Delete a list of tags from a given object.
4458

4459
  """
4460
  _OP_REQP = ["kind", "name", "tags"]
4461

    
4462
  def CheckPrereq(self):
4463
    """Check prerequisites.
4464

4465
    This checks that we have the given tag.
4466

4467
    """
4468
    TagsLU.CheckPrereq(self)
4469
    for tag in self.op.tags:
4470
      objects.TaggableObject.ValidateTag(tag)
4471
    del_tags = frozenset(self.op.tags)
4472
    cur_tags = self.target.GetTags()
4473
    if not del_tags <= cur_tags:
4474
      diff_tags = del_tags - cur_tags
4475
      diff_names = ["'%s'" % tag for tag in diff_tags]
4476
      diff_names.sort()
4477
      raise errors.OpPrereqError("Tag(s) %s not found" %
4478
                                 (",".join(diff_names)))
4479

    
4480
  def Exec(self, feedback_fn):
4481
    """Remove the tag from the object.
4482

4483
    """
4484
    for tag in self.op.tags:
4485
      self.target.RemoveTag(tag)
4486
    try:
4487
      self.cfg.Update(self.target)
4488
    except errors.ConfigurationError:
4489
      raise errors.OpRetryError("There has been a modification to the"
4490
                                " config file and the operation has been"
4491
                                " aborted. Please retry.")