Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 53e4e875

History | View | Annotate | Download (150.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge, mac) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276
  else:
277
    nic_count = 0
278

    
279
  env["INSTANCE_NIC_COUNT"] = nic_count
280

    
281
  return env
282

    
283

    
284
def _BuildInstanceHookEnvByObject(instance, override=None):
285
  """Builds instance related env variables for hooks from an object.
286

287
  Args:
288
    instance: objects.Instance object of instance
289
    override: dict of values to override
290
  """
291
  args = {
292
    'name': instance.name,
293
    'primary_node': instance.primary_node,
294
    'secondary_nodes': instance.secondary_nodes,
295
    'os_type': instance.os,
296
    'status': instance.os,
297
    'memory': instance.memory,
298
    'vcpus': instance.vcpus,
299
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300
  }
301
  if override:
302
    args.update(override)
303
  return _BuildInstanceHookEnv(**args)
304

    
305

    
306
def _UpdateKnownHosts(fullnode, ip, pubkey):
307
  """Ensure a node has a correct known_hosts entry.
308

309
  Args:
310
    fullnode - Fully qualified domain name of host. (str)
311
    ip       - IPv4 address of host (str)
312
    pubkey   - the public key of the cluster
313

314
  """
315
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317
  else:
318
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319

    
320
  inthere = False
321

    
322
  save_lines = []
323
  add_lines = []
324
  removed = False
325

    
326
  for rawline in f:
327
    logger.Debug('read %s' % (repr(rawline),))
328

    
329
    parts = rawline.rstrip('\r\n').split()
330

    
331
    # Ignore unwanted lines
332
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333
      fields = parts[0].split(',')
334
      key = parts[2]
335

    
336
      haveall = True
337
      havesome = False
338
      for spec in [ ip, fullnode ]:
339
        if spec not in fields:
340
          haveall = False
341
        if spec in fields:
342
          havesome = True
343

    
344
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345
      if haveall and key == pubkey:
346
        inthere = True
347
        save_lines.append(rawline)
348
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349
        continue
350

    
351
      if havesome and (not haveall or key != pubkey):
352
        removed = True
353
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354
        continue
355

    
356
    save_lines.append(rawline)
357

    
358
  if not inthere:
359
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361

    
362
  if removed:
363
    save_lines = save_lines + add_lines
364

    
365
    # Write a new file and replace old.
366
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367
                                   constants.DATA_DIR)
368
    newfile = os.fdopen(fd, 'w')
369
    try:
370
      newfile.write(''.join(save_lines))
371
    finally:
372
      newfile.close()
373
    logger.Debug("Wrote new known_hosts.")
374
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375

    
376
  elif add_lines:
377
    # Simply appending a new line will do the trick.
378
    f.seek(0, 2)
379
    for add in add_lines:
380
      f.write(add)
381

    
382
  f.close()
383

    
384

    
385
def _HasValidVG(vglist, vgname):
386
  """Checks if the volume group list is valid.
387

388
  A non-None return value means there's an error, and the return value
389
  is the error message.
390

391
  """
392
  vgsize = vglist.get(vgname, None)
393
  if vgsize is None:
394
    return "volume group '%s' missing" % vgname
395
  elif vgsize < 20480:
396
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397
            (vgname, vgsize))
398
  return None
399

    
400

    
401
def _InitSSHSetup(node):
402
  """Setup the SSH configuration for the cluster.
403

404

405
  This generates a dsa keypair for root, adds the pub key to the
406
  permitted hosts and adds the hostkey to its own known hosts.
407

408
  Args:
409
    node: the name of this host as a fqdn
410

411
  """
412
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413

    
414
  for name in priv_key, pub_key:
415
    if os.path.exists(name):
416
      utils.CreateBackup(name)
417
    utils.RemoveFile(name)
418

    
419
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420
                         "-f", priv_key,
421
                         "-q", "-N", ""])
422
  if result.failed:
423
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424
                             result.output)
425

    
426
  f = open(pub_key, 'r')
427
  try:
428
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
429
  finally:
430
    f.close()
431

    
432

    
433
def _InitGanetiServerSetup(ss):
434
  """Setup the necessary configuration for the initial node daemon.
435

436
  This creates the nodepass file containing the shared password for
437
  the cluster and also generates the SSL certificate.
438

439
  """
440
  # Create pseudo random password
441
  randpass = sha.new(os.urandom(64)).hexdigest()
442
  # and write it into sstore
443
  ss.SetKey(ss.SS_NODED_PASS, randpass)
444

    
445
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446
                         "-days", str(365*5), "-nodes", "-x509",
447
                         "-keyout", constants.SSL_CERT_FILE,
448
                         "-out", constants.SSL_CERT_FILE, "-batch"])
449
  if result.failed:
450
    raise errors.OpExecError("could not generate server ssl cert, command"
451
                             " %s had exitcode %s and error message %s" %
452
                             (result.cmd, result.exit_code, result.output))
453

    
454
  os.chmod(constants.SSL_CERT_FILE, 0400)
455

    
456
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457

    
458
  if result.failed:
459
    raise errors.OpExecError("Could not start the node daemon, command %s"
460
                             " had exitcode %s and error %s" %
461
                             (result.cmd, result.exit_code, result.output))
462

    
463

    
464
def _CheckInstanceBridgesExist(instance):
465
  """Check that the brigdes needed by an instance exist.
466

467
  """
468
  # check bridges existance
469
  brlist = [nic.bridge for nic in instance.nics]
470
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
471
    raise errors.OpPrereqError("one or more target bridges %s does not"
472
                               " exist on destination node '%s'" %
473
                               (brlist, instance.primary_node))
474

    
475

    
476
class LUInitCluster(LogicalUnit):
477
  """Initialise the cluster.
478

479
  """
480
  HPATH = "cluster-init"
481
  HTYPE = constants.HTYPE_CLUSTER
482
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483
              "def_bridge", "master_netdev"]
484
  REQ_CLUSTER = False
485

    
486
  def BuildHooksEnv(self):
487
    """Build hooks env.
488

489
    Notes: Since we don't require a cluster, we must manually add
490
    ourselves in the post-run node list.
491

492
    """
493
    env = {"OP_TARGET": self.op.cluster_name}
494
    return env, [], [self.hostname.name]
495

    
496
  def CheckPrereq(self):
497
    """Verify that the passed name is a valid one.
498

499
    """
500
    if config.ConfigWriter.IsCluster():
501
      raise errors.OpPrereqError("Cluster is already initialised")
502

    
503
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
505
        raise errors.OpPrereqError("Please prepare the cluster VNC"
506
                                   "password file %s" %
507
                                   constants.VNC_PASSWORD_FILE)
508

    
509
    self.hostname = hostname = utils.HostInfo()
510

    
511
    if hostname.ip.startswith("127."):
512
      raise errors.OpPrereqError("This host's IP resolves to the private"
513
                                 " range (%s). Please fix DNS or /etc/hosts." %
514
                                 (hostname.ip,))
515

    
516
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517

    
518
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519
                         constants.DEFAULT_NODED_PORT):
520
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521
                                 " to %s,\nbut this ip address does not"
522
                                 " belong to this host."
523
                                 " Aborting." % hostname.ip)
524

    
525
    secondary_ip = getattr(self.op, "secondary_ip", None)
526
    if secondary_ip and not utils.IsValidIP(secondary_ip):
527
      raise errors.OpPrereqError("Invalid secondary ip given")
528
    if (secondary_ip and
529
        secondary_ip != hostname.ip and
530
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531
                           constants.DEFAULT_NODED_PORT))):
532
      raise errors.OpPrereqError("You gave %s as secondary IP,"
533
                                 " but it does not belong to this host." %
534
                                 secondary_ip)
535
    self.secondary_ip = secondary_ip
536

    
537
    # checks presence of the volume group given
538
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539

    
540
    if vgstatus:
541
      raise errors.OpPrereqError("Error: %s" % vgstatus)
542

    
543
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544
                    self.op.mac_prefix):
545
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546
                                 self.op.mac_prefix)
547

    
548
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
549
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550
                                 self.op.hypervisor_type)
551

    
552
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553
    if result.failed:
554
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555
                                 (self.op.master_netdev,
556
                                  result.output.strip()))
557

    
558
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
561
                                 " executable." % constants.NODE_INITD_SCRIPT)
562

    
563
  def Exec(self, feedback_fn):
564
    """Initialize the cluster.
565

566
    """
567
    clustername = self.clustername
568
    hostname = self.hostname
569

    
570
    # set up the simple store
571
    self.sstore = ss = ssconf.SimpleStore()
572
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577

    
578
    # set up the inter-node password and certificate
579
    _InitGanetiServerSetup(ss)
580

    
581
    # start the master ip
582
    rpc.call_node_start_master(hostname.name)
583

    
584
    # set up ssh config and /etc/hosts
585
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
586
    try:
587
      sshline = f.read()
588
    finally:
589
      f.close()
590
    sshkey = sshline.split(" ")[1]
591

    
592
    _AddHostToEtcHosts(hostname.name)
593

    
594
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595

    
596
    _InitSSHSetup(hostname.name)
597

    
598
    # init of cluster config file
599
    self.cfg = cfgw = config.ConfigWriter()
600
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601
                    sshkey, self.op.mac_prefix,
602
                    self.op.vg_name, self.op.def_bridge)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.sstore.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.sstore.GetMasterNode()
635
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636
    utils.CreateBackup(priv_key)
637
    utils.CreateBackup(pub_key)
638
    rpc.call_node_leave_cluster(master)
639

    
640

    
641
class LUVerifyCluster(NoHooksLU):
642
  """Verifies the cluster status.
643

644
  """
645
  _OP_REQP = []
646

    
647
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648
                  remote_version, feedback_fn):
649
    """Run multiple tests against a node.
650

651
    Test list:
652
      - compares ganeti version
653
      - checks vg existance and size > 20G
654
      - checks config file checksum
655
      - checks ssh to other nodes
656

657
    Args:
658
      node: name of the node to check
659
      file_list: required list of files
660
      local_cksum: dictionary of local files and their checksums
661

662
    """
663
    # compares ganeti version
664
    local_version = constants.PROTOCOL_VERSION
665
    if not remote_version:
666
      feedback_fn(" - ERROR: connection to %s failed" % (node))
667
      return True
668

    
669
    if local_version != remote_version:
670
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671
                      (local_version, node, remote_version))
672
      return True
673

    
674
    # checks vg existance and size > 20G
675

    
676
    bad = False
677
    if not vglist:
678
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679
                      (node,))
680
      bad = True
681
    else:
682
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683
      if vgstatus:
684
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685
        bad = True
686

    
687
    # checks config file checksum
688
    # checks ssh to any
689

    
690
    if 'filelist' not in node_result:
691
      bad = True
692
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
693
    else:
694
      remote_cksum = node_result['filelist']
695
      for file_name in file_list:
696
        if file_name not in remote_cksum:
697
          bad = True
698
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
699
        elif remote_cksum[file_name] != local_cksum[file_name]:
700
          bad = True
701
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702

    
703
    if 'nodelist' not in node_result:
704
      bad = True
705
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706
    else:
707
      if node_result['nodelist']:
708
        bad = True
709
        for node in node_result['nodelist']:
710
          feedback_fn("  - ERROR: communication with node '%s': %s" %
711
                          (node, node_result['nodelist'][node]))
712
    hyp_result = node_result.get('hypervisor', None)
713
    if hyp_result is not None:
714
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715
    return bad
716

    
717
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718
    """Verify an instance.
719

720
    This function checks to see if the required block devices are
721
    available on the instance's node.
722

723
    """
724
    bad = False
725

    
726
    instancelist = self.cfg.GetInstanceList()
727
    if not instance in instancelist:
728
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
729
                      (instance, instancelist))
730
      bad = True
731

    
732
    instanceconfig = self.cfg.GetInstanceInfo(instance)
733
    node_current = instanceconfig.primary_node
734

    
735
    node_vol_should = {}
736
    instanceconfig.MapLVsByNode(node_vol_should)
737

    
738
    for node in node_vol_should:
739
      for volume in node_vol_should[node]:
740
        if node not in node_vol_is or volume not in node_vol_is[node]:
741
          feedback_fn("  - ERROR: volume %s missing on node %s" %
742
                          (volume, node))
743
          bad = True
744

    
745
    if not instanceconfig.status == 'down':
746
      if not instance in node_instance[node_current]:
747
        feedback_fn("  - ERROR: instance %s not running on node %s" %
748
                        (instance, node_current))
749
        bad = True
750

    
751
    for node in node_instance:
752
      if (not node == node_current):
753
        if instance in node_instance[node]:
754
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
755
                          (instance, node))
756
          bad = True
757

    
758
    return bad
759

    
760
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761
    """Verify if there are any unknown volumes in the cluster.
762

763
    The .os, .swap and backup volumes are ignored. All other volumes are
764
    reported as unknown.
765

766
    """
767
    bad = False
768

    
769
    for node in node_vol_is:
770
      for volume in node_vol_is[node]:
771
        if node not in node_vol_should or volume not in node_vol_should[node]:
772
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773
                      (volume, node))
774
          bad = True
775
    return bad
776

    
777
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778
    """Verify the list of running instances.
779

780
    This checks what instances are running but unknown to the cluster.
781

782
    """
783
    bad = False
784
    for node in node_instance:
785
      for runninginstance in node_instance[node]:
786
        if runninginstance not in instancelist:
787
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788
                          (runninginstance, node))
789
          bad = True
790
    return bad
791

    
792
  def CheckPrereq(self):
793
    """Check prerequisites.
794

795
    This has no prerequisites.
796

797
    """
798
    pass
799

    
800
  def Exec(self, feedback_fn):
801
    """Verify integrity of cluster, performing various test on nodes.
802

803
    """
804
    bad = False
805
    feedback_fn("* Verifying global settings")
806
    for msg in self.cfg.VerifyConfig():
807
      feedback_fn("  - ERROR: %s" % msg)
808

    
809
    vg_name = self.cfg.GetVGName()
810
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
811
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812
    node_volume = {}
813
    node_instance = {}
814

    
815
    # FIXME: verify OS list
816
    # do local checksums
817
    file_names = list(self.sstore.GetFileList())
818
    file_names.append(constants.SSL_CERT_FILE)
819
    file_names.append(constants.CLUSTER_CONF_FILE)
820
    local_checksums = utils.FingerprintFiles(file_names)
821

    
822
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824
    all_instanceinfo = rpc.call_instance_list(nodelist)
825
    all_vglist = rpc.call_vg_list(nodelist)
826
    node_verify_param = {
827
      'filelist': file_names,
828
      'nodelist': nodelist,
829
      'hypervisor': None,
830
      }
831
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832
    all_rversion = rpc.call_version(nodelist)
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
    node_vol_should = {}
866

    
867
    for instance in instancelist:
868
      feedback_fn("* Verifying instance %s" % instance)
869
      result =  self._VerifyInstance(instance, node_volume, node_instance,
870
                                     feedback_fn)
871
      bad = bad or result
872

    
873
      inst_config = self.cfg.GetInstanceInfo(instance)
874

    
875
      inst_config.MapLVsByNode(node_vol_should)
876

    
877
    feedback_fn("* Verifying orphan volumes")
878
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879
                                       feedback_fn)
880
    bad = bad or result
881

    
882
    feedback_fn("* Verifying remaining instances")
883
    result = self._VerifyOrphanInstances(instancelist, node_instance,
884
                                         feedback_fn)
885
    bad = bad or result
886

    
887
    return int(bad)
888

    
889

    
890
class LUVerifyDisks(NoHooksLU):
891
  """Verifies the cluster disks status.
892

893
  """
894
  _OP_REQP = []
895

    
896
  def CheckPrereq(self):
897
    """Check prerequisites.
898

899
    This has no prerequisites.
900

901
    """
902
    pass
903

    
904
  def Exec(self, feedback_fn):
905
    """Verify integrity of cluster disks.
906

907
    """
908
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909

    
910
    vg_name = self.cfg.GetVGName()
911
    nodes = utils.NiceSort(self.cfg.GetNodeList())
912
    instances = [self.cfg.GetInstanceInfo(name)
913
                 for name in self.cfg.GetInstanceList()]
914

    
915
    nv_dict = {}
916
    for inst in instances:
917
      inst_lvs = {}
918
      if (inst.status != "up" or
919
          inst.disk_template not in constants.DTS_NET_MIRROR):
920
        continue
921
      inst.MapLVsByNode(inst_lvs)
922
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923
      for node, vol_list in inst_lvs.iteritems():
924
        for vol in vol_list:
925
          nv_dict[(node, vol)] = inst
926

    
927
    if not nv_dict:
928
      return result
929

    
930
    node_lvs = rpc.call_volume_list(nodes, vg_name)
931

    
932
    to_act = set()
933
    for node in nodes:
934
      # node_volume
935
      lvs = node_lvs[node]
936

    
937
      if isinstance(lvs, basestring):
938
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939
        res_nlvm[node] = lvs
940
      elif not isinstance(lvs, dict):
941
        logger.Info("connection to node %s failed or invalid data returned" %
942
                    (node,))
943
        res_nodes.append(node)
944
        continue
945

    
946
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947
        inst = nv_dict.pop((node, lv_name), None)
948
        if (not lv_online and inst is not None
949
            and inst.name not in res_instances):
950
            res_instances.append(inst.name)
951

    
952
    # any leftover items in nv_dict are missing LVs, let's arrange the
953
    # data better
954
    for key, inst in nv_dict.iteritems():
955
      if inst.name not in res_missing:
956
        res_missing[inst.name] = []
957
      res_missing[inst.name].append(key)
958

    
959
    return result
960

    
961

    
962
class LURenameCluster(LogicalUnit):
963
  """Rename the cluster.
964

965
  """
966
  HPATH = "cluster-rename"
967
  HTYPE = constants.HTYPE_CLUSTER
968
  _OP_REQP = ["name"]
969

    
970
  def BuildHooksEnv(self):
971
    """Build hooks env.
972

973
    """
974
    env = {
975
      "OP_TARGET": self.op.sstore.GetClusterName(),
976
      "NEW_NAME": self.op.name,
977
      }
978
    mn = self.sstore.GetMasterNode()
979
    return env, [mn], [mn]
980

    
981
  def CheckPrereq(self):
982
    """Verify that the passed name is a valid one.
983

984
    """
985
    hostname = utils.HostInfo(self.op.name)
986

    
987
    new_name = hostname.name
988
    self.ip = new_ip = hostname.ip
989
    old_name = self.sstore.GetClusterName()
990
    old_ip = self.sstore.GetMasterIP()
991
    if new_name == old_name and new_ip == old_ip:
992
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
993
                                 " cluster has changed")
994
    if new_ip != old_ip:
995
      result = utils.RunCmd(["fping", "-q", new_ip])
996
      if not result.failed:
997
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
998
                                   " reachable on the network. Aborting." %
999
                                   new_ip)
1000

    
1001
    self.op.name = new_name
1002

    
1003
  def Exec(self, feedback_fn):
1004
    """Rename the cluster.
1005

1006
    """
1007
    clustername = self.op.name
1008
    ip = self.ip
1009
    ss = self.sstore
1010

    
1011
    # shutdown the master IP
1012
    master = ss.GetMasterNode()
1013
    if not rpc.call_node_stop_master(master):
1014
      raise errors.OpExecError("Could not disable the master role")
1015

    
1016
    try:
1017
      # modify the sstore
1018
      ss.SetKey(ss.SS_MASTER_IP, ip)
1019
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020

    
1021
      # Distribute updated ss config to all nodes
1022
      myself = self.cfg.GetNodeInfo(master)
1023
      dist_nodes = self.cfg.GetNodeList()
1024
      if myself.name in dist_nodes:
1025
        dist_nodes.remove(myself.name)
1026

    
1027
      logger.Debug("Copying updated ssconf data to all nodes")
1028
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029
        fname = ss.KeyToFilename(keyname)
1030
        result = rpc.call_upload_file(dist_nodes, fname)
1031
        for to_node in dist_nodes:
1032
          if not result[to_node]:
1033
            logger.Error("copy of file %s to node %s failed" %
1034
                         (fname, to_node))
1035
    finally:
1036
      if not rpc.call_node_start_master(master):
1037
        logger.Error("Could not re-enable the master role on the master,"
1038
                     " please restart manually.")
1039

    
1040

    
1041
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042
  """Sleep and poll for an instance's disk to sync.
1043

1044
  """
1045
  if not instance.disks:
1046
    return True
1047

    
1048
  if not oneshot:
1049
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050

    
1051
  node = instance.primary_node
1052

    
1053
  for dev in instance.disks:
1054
    cfgw.SetDiskID(dev, node)
1055

    
1056
  retries = 0
1057
  while True:
1058
    max_time = 0
1059
    done = True
1060
    cumul_degraded = False
1061
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062
    if not rstats:
1063
      proc.LogWarning("Can't get any data from node %s" % node)
1064
      retries += 1
1065
      if retries >= 10:
1066
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1067
                                 " aborting." % node)
1068
      time.sleep(6)
1069
      continue
1070
    retries = 0
1071
    for i in range(len(rstats)):
1072
      mstat = rstats[i]
1073
      if mstat is None:
1074
        proc.LogWarning("Can't compute data for node %s/%s" %
1075
                        (node, instance.disks[i].iv_name))
1076
        continue
1077
      # we ignore the ldisk parameter
1078
      perc_done, est_time, is_degraded, _ = mstat
1079
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080
      if perc_done is not None:
1081
        done = False
1082
        if est_time is not None:
1083
          rem_time = "%d estimated seconds remaining" % est_time
1084
          max_time = est_time
1085
        else:
1086
          rem_time = "no time estimate"
1087
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088
                     (instance.disks[i].iv_name, perc_done, rem_time))
1089
    if done or oneshot:
1090
      break
1091

    
1092
    if unlock:
1093
      utils.Unlock('cmd')
1094
    try:
1095
      time.sleep(min(60, max_time))
1096
    finally:
1097
      if unlock:
1098
        utils.Lock('cmd')
1099

    
1100
  if done:
1101
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102
  return not cumul_degraded
1103

    
1104

    
1105
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106
  """Check that mirrors are not degraded.
1107

1108
  The ldisk parameter, if True, will change the test from the
1109
  is_degraded attribute (which represents overall non-ok status for
1110
  the device(s)) to the ldisk (representing the local storage status).
1111

1112
  """
1113
  cfgw.SetDiskID(dev, node)
1114
  if ldisk:
1115
    idx = 6
1116
  else:
1117
    idx = 5
1118

    
1119
  result = True
1120
  if on_primary or dev.AssembleOnSecondary():
1121
    rstats = rpc.call_blockdev_find(node, dev)
1122
    if not rstats:
1123
      logger.ToStderr("Can't get any data from node %s" % node)
1124
      result = False
1125
    else:
1126
      result = result and (not rstats[idx])
1127
  if dev.children:
1128
    for child in dev.children:
1129
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130

    
1131
  return result
1132

    
1133

    
1134
class LUDiagnoseOS(NoHooksLU):
1135
  """Logical unit for OS diagnose/query.
1136

1137
  """
1138
  _OP_REQP = []
1139

    
1140
  def CheckPrereq(self):
1141
    """Check prerequisites.
1142

1143
    This always succeeds, since this is a pure query LU.
1144

1145
    """
1146
    return
1147

    
1148
  def Exec(self, feedback_fn):
1149
    """Compute the list of OSes.
1150

1151
    """
1152
    node_list = self.cfg.GetNodeList()
1153
    node_data = rpc.call_os_diagnose(node_list)
1154
    if node_data == False:
1155
      raise errors.OpExecError("Can't gather the list of OSes")
1156
    return node_data
1157

    
1158

    
1159
class LURemoveNode(LogicalUnit):
1160
  """Logical unit for removing a node.
1161

1162
  """
1163
  HPATH = "node-remove"
1164
  HTYPE = constants.HTYPE_NODE
1165
  _OP_REQP = ["node_name"]
1166

    
1167
  def BuildHooksEnv(self):
1168
    """Build hooks env.
1169

1170
    This doesn't run on the target node in the pre phase as a failed
1171
    node would not allows itself to run.
1172

1173
    """
1174
    env = {
1175
      "OP_TARGET": self.op.node_name,
1176
      "NODE_NAME": self.op.node_name,
1177
      }
1178
    all_nodes = self.cfg.GetNodeList()
1179
    all_nodes.remove(self.op.node_name)
1180
    return env, all_nodes, all_nodes
1181

    
1182
  def CheckPrereq(self):
1183
    """Check prerequisites.
1184

1185
    This checks:
1186
     - the node exists in the configuration
1187
     - it does not have primary or secondary instances
1188
     - it's not the master
1189

1190
    Any errors are signalled by raising errors.OpPrereqError.
1191

1192
    """
1193
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194
    if node is None:
1195
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196

    
1197
    instance_list = self.cfg.GetInstanceList()
1198

    
1199
    masternode = self.sstore.GetMasterNode()
1200
    if node.name == masternode:
1201
      raise errors.OpPrereqError("Node is the master node,"
1202
                                 " you need to failover first.")
1203

    
1204
    for instance_name in instance_list:
1205
      instance = self.cfg.GetInstanceInfo(instance_name)
1206
      if node.name == instance.primary_node:
1207
        raise errors.OpPrereqError("Instance %s still running on the node,"
1208
                                   " please remove first." % instance_name)
1209
      if node.name in instance.secondary_nodes:
1210
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211
                                   " please remove first." % instance_name)
1212
    self.op.node_name = node.name
1213
    self.node = node
1214

    
1215
  def Exec(self, feedback_fn):
1216
    """Removes the node from the cluster.
1217

1218
    """
1219
    node = self.node
1220
    logger.Info("stopping the node daemon and removing configs from node %s" %
1221
                node.name)
1222

    
1223
    rpc.call_node_leave_cluster(node.name)
1224

    
1225
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226

    
1227
    logger.Info("Removing node %s from config" % node.name)
1228

    
1229
    self.cfg.RemoveNode(node.name)
1230

    
1231
    _RemoveHostFromEtcHosts(node.name)
1232

    
1233

    
1234
class LUQueryNodes(NoHooksLU):
1235
  """Logical unit for querying nodes.
1236

1237
  """
1238
  _OP_REQP = ["output_fields", "names"]
1239

    
1240
  def CheckPrereq(self):
1241
    """Check prerequisites.
1242

1243
    This checks that the fields required are valid output fields.
1244

1245
    """
1246
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1247
                                     "mtotal", "mnode", "mfree",
1248
                                     "bootid"])
1249

    
1250
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251
                               "pinst_list", "sinst_list",
1252
                               "pip", "sip"],
1253
                       dynamic=self.dynamic_fields,
1254
                       selected=self.op.output_fields)
1255

    
1256
    self.wanted = _GetWantedNodes(self, self.op.names)
1257

    
1258
  def Exec(self, feedback_fn):
1259
    """Computes the list of nodes and their attributes.
1260

1261
    """
1262
    nodenames = self.wanted
1263
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264

    
1265
    # begin data gathering
1266

    
1267
    if self.dynamic_fields.intersection(self.op.output_fields):
1268
      live_data = {}
1269
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270
      for name in nodenames:
1271
        nodeinfo = node_data.get(name, None)
1272
        if nodeinfo:
1273
          live_data[name] = {
1274
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279
            "bootid": nodeinfo['bootid'],
1280
            }
1281
        else:
1282
          live_data[name] = {}
1283
    else:
1284
      live_data = dict.fromkeys(nodenames, {})
1285

    
1286
    node_to_primary = dict([(name, set()) for name in nodenames])
1287
    node_to_secondary = dict([(name, set()) for name in nodenames])
1288

    
1289
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290
                             "sinst_cnt", "sinst_list"))
1291
    if inst_fields & frozenset(self.op.output_fields):
1292
      instancelist = self.cfg.GetInstanceList()
1293

    
1294
      for instance_name in instancelist:
1295
        inst = self.cfg.GetInstanceInfo(instance_name)
1296
        if inst.primary_node in node_to_primary:
1297
          node_to_primary[inst.primary_node].add(inst.name)
1298
        for secnode in inst.secondary_nodes:
1299
          if secnode in node_to_secondary:
1300
            node_to_secondary[secnode].add(inst.name)
1301

    
1302
    # end data gathering
1303

    
1304
    output = []
1305
    for node in nodelist:
1306
      node_output = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = node.name
1310
        elif field == "pinst_list":
1311
          val = list(node_to_primary[node.name])
1312
        elif field == "sinst_list":
1313
          val = list(node_to_secondary[node.name])
1314
        elif field == "pinst_cnt":
1315
          val = len(node_to_primary[node.name])
1316
        elif field == "sinst_cnt":
1317
          val = len(node_to_secondary[node.name])
1318
        elif field == "pip":
1319
          val = node.primary_ip
1320
        elif field == "sip":
1321
          val = node.secondary_ip
1322
        elif field in self.dynamic_fields:
1323
          val = live_data[node.name].get(field, None)
1324
        else:
1325
          raise errors.ParameterError(field)
1326
        node_output.append(val)
1327
      output.append(node_output)
1328

    
1329
    return output
1330

    
1331

    
1332
class LUQueryNodeVolumes(NoHooksLU):
1333
  """Logical unit for getting volumes on node(s).
1334

1335
  """
1336
  _OP_REQP = ["nodes", "output_fields"]
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    This checks that the fields required are valid output fields.
1342

1343
    """
1344
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1345

    
1346
    _CheckOutputFields(static=["node"],
1347
                       dynamic=["phys", "vg", "name", "size", "instance"],
1348
                       selected=self.op.output_fields)
1349

    
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Computes the list of nodes and their attributes.
1353

1354
    """
1355
    nodenames = self.nodes
1356
    volumes = rpc.call_node_volumes(nodenames)
1357

    
1358
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359
             in self.cfg.GetInstanceList()]
1360

    
1361
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362

    
1363
    output = []
1364
    for node in nodenames:
1365
      if node not in volumes or not volumes[node]:
1366
        continue
1367

    
1368
      node_vols = volumes[node][:]
1369
      node_vols.sort(key=lambda vol: vol['dev'])
1370

    
1371
      for vol in node_vols:
1372
        node_output = []
1373
        for field in self.op.output_fields:
1374
          if field == "node":
1375
            val = node
1376
          elif field == "phys":
1377
            val = vol['dev']
1378
          elif field == "vg":
1379
            val = vol['vg']
1380
          elif field == "name":
1381
            val = vol['name']
1382
          elif field == "size":
1383
            val = int(float(vol['size']))
1384
          elif field == "instance":
1385
            for inst in ilist:
1386
              if node not in lv_by_node[inst]:
1387
                continue
1388
              if vol['name'] in lv_by_node[inst][node]:
1389
                val = inst.name
1390
                break
1391
            else:
1392
              val = '-'
1393
          else:
1394
            raise errors.ParameterError(field)
1395
          node_output.append(str(val))
1396

    
1397
        output.append(node_output)
1398

    
1399
    return output
1400

    
1401

    
1402
class LUAddNode(LogicalUnit):
1403
  """Logical unit for adding node to the cluster.
1404

1405
  """
1406
  HPATH = "node-add"
1407
  HTYPE = constants.HTYPE_NODE
1408
  _OP_REQP = ["node_name"]
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This will run on all nodes before, and on all nodes + the new node after.
1414

1415
    """
1416
    env = {
1417
      "OP_TARGET": self.op.node_name,
1418
      "NODE_NAME": self.op.node_name,
1419
      "NODE_PIP": self.op.primary_ip,
1420
      "NODE_SIP": self.op.secondary_ip,
1421
      }
1422
    nodes_0 = self.cfg.GetNodeList()
1423
    nodes_1 = nodes_0 + [self.op.node_name, ]
1424
    return env, nodes_0, nodes_1
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks:
1430
     - the new node is not already in the config
1431
     - it is resolvable
1432
     - its parameters (single/dual homed) matches the cluster
1433

1434
    Any errors are signalled by raising errors.OpPrereqError.
1435

1436
    """
1437
    node_name = self.op.node_name
1438
    cfg = self.cfg
1439

    
1440
    dns_data = utils.HostInfo(node_name)
1441

    
1442
    node = dns_data.name
1443
    primary_ip = self.op.primary_ip = dns_data.ip
1444
    secondary_ip = getattr(self.op, "secondary_ip", None)
1445
    if secondary_ip is None:
1446
      secondary_ip = primary_ip
1447
    if not utils.IsValidIP(secondary_ip):
1448
      raise errors.OpPrereqError("Invalid secondary IP given")
1449
    self.op.secondary_ip = secondary_ip
1450
    node_list = cfg.GetNodeList()
1451
    if node in node_list:
1452
      raise errors.OpPrereqError("Node %s is already in the configuration"
1453
                                 % node)
1454

    
1455
    for existing_node_name in node_list:
1456
      existing_node = cfg.GetNodeInfo(existing_node_name)
1457
      if (existing_node.primary_ip == primary_ip or
1458
          existing_node.secondary_ip == primary_ip or
1459
          existing_node.primary_ip == secondary_ip or
1460
          existing_node.secondary_ip == secondary_ip):
1461
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1462
                                   " existing node %s" % existing_node.name)
1463

    
1464
    # check that the type of the node (single versus dual homed) is the
1465
    # same as for the master
1466
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1468
    newbie_singlehomed = secondary_ip == primary_ip
1469
    if master_singlehomed != newbie_singlehomed:
1470
      if master_singlehomed:
1471
        raise errors.OpPrereqError("The master has no private ip but the"
1472
                                   " new node has one")
1473
      else:
1474
        raise errors.OpPrereqError("The master has a private ip but the"
1475
                                   " new node doesn't have one")
1476

    
1477
    # checks reachablity
1478
    if not utils.TcpPing(utils.HostInfo().name,
1479
                         primary_ip,
1480
                         constants.DEFAULT_NODED_PORT):
1481
      raise errors.OpPrereqError("Node not reachable by ping")
1482

    
1483
    if not newbie_singlehomed:
1484
      # check reachability from my secondary ip to newbie's secondary ip
1485
      if not utils.TcpPing(myself.secondary_ip,
1486
                           secondary_ip,
1487
                           constants.DEFAULT_NODED_PORT):
1488
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489
                                   " based ping to noded port")
1490

    
1491
    self.new_node = objects.Node(name=node,
1492
                                 primary_ip=primary_ip,
1493
                                 secondary_ip=secondary_ip)
1494

    
1495
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498
                                   constants.VNC_PASSWORD_FILE)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Adds the new node to the cluster.
1502

1503
    """
1504
    new_node = self.new_node
1505
    node = new_node.name
1506

    
1507
    # set up inter-node password and certificate and restarts the node daemon
1508
    gntpass = self.sstore.GetNodeDaemonPassword()
1509
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510
      raise errors.OpExecError("ganeti password corruption detected")
1511
    f = open(constants.SSL_CERT_FILE)
1512
    try:
1513
      gntpem = f.read(8192)
1514
    finally:
1515
      f.close()
1516
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517
    # so we use this to detect an invalid certificate; as long as the
1518
    # cert doesn't contain this, the here-document will be correctly
1519
    # parsed by the shell sequence below
1520
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522
    if not gntpem.endswith("\n"):
1523
      raise errors.OpExecError("PEM must end with newline")
1524
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525

    
1526
    # and then connect with ssh to set password and start ganeti-noded
1527
    # note that all the below variables are sanitized at this point,
1528
    # either by being constants or by the checks above
1529
    ss = self.sstore
1530
    mycommand = ("umask 077 && "
1531
                 "echo '%s' > '%s' && "
1532
                 "cat > '%s' << '!EOF.' && \n"
1533
                 "%s!EOF.\n%s restart" %
1534
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535
                  constants.SSL_CERT_FILE, gntpem,
1536
                  constants.NODE_INITD_SCRIPT))
1537

    
1538
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539
    if result.failed:
1540
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1541
                               " output: %s" %
1542
                               (node, result.fail_reason, result.output))
1543

    
1544
    # check connectivity
1545
    time.sleep(4)
1546

    
1547
    result = rpc.call_version([node])[node]
1548
    if result:
1549
      if constants.PROTOCOL_VERSION == result:
1550
        logger.Info("communication to node %s fine, sw version %s match" %
1551
                    (node, result))
1552
      else:
1553
        raise errors.OpExecError("Version mismatch master version %s,"
1554
                                 " node version %s" %
1555
                                 (constants.PROTOCOL_VERSION, result))
1556
    else:
1557
      raise errors.OpExecError("Cannot get version from the new node")
1558

    
1559
    # setup ssh on node
1560
    logger.Info("copy ssh key to node %s" % node)
1561
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562
    keyarray = []
1563
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565
                priv_key, pub_key]
1566

    
1567
    for i in keyfiles:
1568
      f = open(i, 'r')
1569
      try:
1570
        keyarray.append(f.read())
1571
      finally:
1572
        f.close()
1573

    
1574
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575
                               keyarray[3], keyarray[4], keyarray[5])
1576

    
1577
    if not result:
1578
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579

    
1580
    # Add node to our /etc/hosts, and add key to known_hosts
1581
    _AddHostToEtcHosts(new_node.name)
1582

    
1583
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584
                      self.cfg.GetHostKey())
1585

    
1586
    if new_node.secondary_ip != new_node.primary_ip:
1587
      if not rpc.call_node_tcp_ping(new_node.name,
1588
                                    constants.LOCALHOST_IP_ADDRESS,
1589
                                    new_node.secondary_ip,
1590
                                    constants.DEFAULT_NODED_PORT,
1591
                                    10, False):
1592
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593
                                 " you gave (%s). Please fix and re-run this"
1594
                                 " command." % new_node.secondary_ip)
1595

    
1596
    success, msg = ssh.VerifyNodeHostname(node)
1597
    if not success:
1598
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599
                               " than the one the resolver gives: %s."
1600
                               " Please fix and re-run this command." %
1601
                               (node, msg))
1602

    
1603
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1604
    # including the node just added
1605
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606
    dist_nodes = self.cfg.GetNodeList() + [node]
1607
    if myself.name in dist_nodes:
1608
      dist_nodes.remove(myself.name)
1609

    
1610
    logger.Debug("Copying hosts and known_hosts to all nodes")
1611
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612
      result = rpc.call_upload_file(dist_nodes, fname)
1613
      for to_node in dist_nodes:
1614
        if not result[to_node]:
1615
          logger.Error("copy of file %s to node %s failed" %
1616
                       (fname, to_node))
1617

    
1618
    to_copy = ss.GetFileList()
1619
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620
      to_copy.append(constants.VNC_PASSWORD_FILE)
1621
    for fname in to_copy:
1622
      if not ssh.CopyFileToNode(node, fname):
1623
        logger.Error("could not copy file %s to node %s" % (fname, node))
1624

    
1625
    logger.Info("adding node %s to cluster.conf" % node)
1626
    self.cfg.AddNode(new_node)
1627

    
1628

    
1629
class LUMasterFailover(LogicalUnit):
1630
  """Failover the master node to the current node.
1631

1632
  This is a special LU in that it must run on a non-master node.
1633

1634
  """
1635
  HPATH = "master-failover"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_MASTER = False
1638
  _OP_REQP = []
1639

    
1640
  def BuildHooksEnv(self):
1641
    """Build hooks env.
1642

1643
    This will run on the new master only in the pre phase, and on all
1644
    the nodes in the post phase.
1645

1646
    """
1647
    env = {
1648
      "OP_TARGET": self.new_master,
1649
      "NEW_MASTER": self.new_master,
1650
      "OLD_MASTER": self.old_master,
1651
      }
1652
    return env, [self.new_master], self.cfg.GetNodeList()
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks that we are not already the master.
1658

1659
    """
1660
    self.new_master = utils.HostInfo().name
1661
    self.old_master = self.sstore.GetMasterNode()
1662

    
1663
    if self.old_master == self.new_master:
1664
      raise errors.OpPrereqError("This commands must be run on the node"
1665
                                 " where you want the new master to be."
1666
                                 " %s is already the master" %
1667
                                 self.old_master)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Failover the master node.
1671

1672
    This command, when run on a non-master node, will cause the current
1673
    master to cease being master, and the non-master to become new
1674
    master.
1675

1676
    """
1677
    #TODO: do not rely on gethostname returning the FQDN
1678
    logger.Info("setting master to %s, old master: %s" %
1679
                (self.new_master, self.old_master))
1680

    
1681
    if not rpc.call_node_stop_master(self.old_master):
1682
      logger.Error("could disable the master role on the old master"
1683
                   " %s, please disable manually" % self.old_master)
1684

    
1685
    ss = self.sstore
1686
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689
      logger.Error("could not distribute the new simple store master file"
1690
                   " to the other nodes, please check.")
1691

    
1692
    if not rpc.call_node_start_master(self.new_master):
1693
      logger.Error("could not start the master role on the new master"
1694
                   " %s, please check" % self.new_master)
1695
      feedback_fn("Error in activating the master IP on the new master,"
1696
                  " please fix manually.")
1697

    
1698

    
1699

    
1700
class LUQueryClusterInfo(NoHooksLU):
1701
  """Query cluster configuration.
1702

1703
  """
1704
  _OP_REQP = []
1705
  REQ_MASTER = False
1706

    
1707
  def CheckPrereq(self):
1708
    """No prerequsites needed for this LU.
1709

1710
    """
1711
    pass
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Return cluster config.
1715

1716
    """
1717
    result = {
1718
      "name": self.sstore.GetClusterName(),
1719
      "software_version": constants.RELEASE_VERSION,
1720
      "protocol_version": constants.PROTOCOL_VERSION,
1721
      "config_version": constants.CONFIG_VERSION,
1722
      "os_api_version": constants.OS_API_VERSION,
1723
      "export_version": constants.EXPORT_VERSION,
1724
      "master": self.sstore.GetMasterNode(),
1725
      "architecture": (platform.architecture()[0], platform.machine()),
1726
      }
1727

    
1728
    return result
1729

    
1730

    
1731
class LUClusterCopyFile(NoHooksLU):
1732
  """Copy file to cluster.
1733

1734
  """
1735
  _OP_REQP = ["nodes", "filename"]
1736

    
1737
  def CheckPrereq(self):
1738
    """Check prerequisites.
1739

1740
    It should check that the named file exists and that the given list
1741
    of nodes is valid.
1742

1743
    """
1744
    if not os.path.exists(self.op.filename):
1745
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746

    
1747
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1748

    
1749
  def Exec(self, feedback_fn):
1750
    """Copy a file from master to some nodes.
1751

1752
    Args:
1753
      opts - class with options as members
1754
      args - list containing a single element, the file name
1755
    Opts used:
1756
      nodes - list containing the name of target nodes; if empty, all nodes
1757

1758
    """
1759
    filename = self.op.filename
1760

    
1761
    myname = utils.HostInfo().name
1762

    
1763
    for node in self.nodes:
1764
      if node == myname:
1765
        continue
1766
      if not ssh.CopyFileToNode(node, filename):
1767
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768

    
1769

    
1770
class LUDumpClusterConfig(NoHooksLU):
1771
  """Return a text-representation of the cluster-config.
1772

1773
  """
1774
  _OP_REQP = []
1775

    
1776
  def CheckPrereq(self):
1777
    """No prerequisites.
1778

1779
    """
1780
    pass
1781

    
1782
  def Exec(self, feedback_fn):
1783
    """Dump a representation of the cluster config to the standard output.
1784

1785
    """
1786
    return self.cfg.DumpConfig()
1787

    
1788

    
1789
class LURunClusterCommand(NoHooksLU):
1790
  """Run a command on some nodes.
1791

1792
  """
1793
  _OP_REQP = ["command", "nodes"]
1794

    
1795
  def CheckPrereq(self):
1796
    """Check prerequisites.
1797

1798
    It checks that the given list of nodes is valid.
1799

1800
    """
1801
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1802

    
1803
  def Exec(self, feedback_fn):
1804
    """Run a command on some nodes.
1805

1806
    """
1807
    data = []
1808
    for node in self.nodes:
1809
      result = ssh.SSHCall(node, "root", self.op.command)
1810
      data.append((node, result.output, result.exit_code))
1811

    
1812
    return data
1813

    
1814

    
1815
class LUActivateInstanceDisks(NoHooksLU):
1816
  """Bring up an instance's disks.
1817

1818
  """
1819
  _OP_REQP = ["instance_name"]
1820

    
1821
  def CheckPrereq(self):
1822
    """Check prerequisites.
1823

1824
    This checks that the instance is in the cluster.
1825

1826
    """
1827
    instance = self.cfg.GetInstanceInfo(
1828
      self.cfg.ExpandInstanceName(self.op.instance_name))
1829
    if instance is None:
1830
      raise errors.OpPrereqError("Instance '%s' not known" %
1831
                                 self.op.instance_name)
1832
    self.instance = instance
1833

    
1834

    
1835
  def Exec(self, feedback_fn):
1836
    """Activate the disks.
1837

1838
    """
1839
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840
    if not disks_ok:
1841
      raise errors.OpExecError("Cannot activate block devices")
1842

    
1843
    return disks_info
1844

    
1845

    
1846
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847
  """Prepare the block devices for an instance.
1848

1849
  This sets up the block devices on all nodes.
1850

1851
  Args:
1852
    instance: a ganeti.objects.Instance object
1853
    ignore_secondaries: if true, errors on secondary nodes won't result
1854
                        in an error return from the function
1855

1856
  Returns:
1857
    false if the operation failed
1858
    list of (host, instance_visible_name, node_visible_name) if the operation
1859
         suceeded with the mapping from node devices to instance devices
1860
  """
1861
  device_info = []
1862
  disks_ok = True
1863
  for inst_disk in instance.disks:
1864
    master_result = None
1865
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1866
      cfg.SetDiskID(node_disk, node)
1867
      is_primary = node == instance.primary_node
1868
      result = rpc.call_blockdev_assemble(node, node_disk,
1869
                                          instance.name, is_primary)
1870
      if not result:
1871
        logger.Error("could not prepare block device %s on node %s"
1872
                     " (is_primary=%s)" %
1873
                     (inst_disk.iv_name, node, is_primary))
1874
        if is_primary or not ignore_secondaries:
1875
          disks_ok = False
1876
      if is_primary:
1877
        master_result = result
1878
    device_info.append((instance.primary_node, inst_disk.iv_name,
1879
                        master_result))
1880

    
1881
  # leave the disks configured for the primary node
1882
  # this is a workaround that would be fixed better by
1883
  # improving the logical/physical id handling
1884
  for disk in instance.disks:
1885
    cfg.SetDiskID(disk, instance.primary_node)
1886

    
1887
  return disks_ok, device_info
1888

    
1889

    
1890
def _StartInstanceDisks(cfg, instance, force):
1891
  """Start the disks of an instance.
1892

1893
  """
1894
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1895
                                           ignore_secondaries=force)
1896
  if not disks_ok:
1897
    _ShutdownInstanceDisks(instance, cfg)
1898
    if force is not None and not force:
1899
      logger.Error("If the message above refers to a secondary node,"
1900
                   " you can retry the operation using '--force'.")
1901
    raise errors.OpExecError("Disk consistency error")
1902

    
1903

    
1904
class LUDeactivateInstanceDisks(NoHooksLU):
1905
  """Shutdown an instance's disks.
1906

1907
  """
1908
  _OP_REQP = ["instance_name"]
1909

    
1910
  def CheckPrereq(self):
1911
    """Check prerequisites.
1912

1913
    This checks that the instance is in the cluster.
1914

1915
    """
1916
    instance = self.cfg.GetInstanceInfo(
1917
      self.cfg.ExpandInstanceName(self.op.instance_name))
1918
    if instance is None:
1919
      raise errors.OpPrereqError("Instance '%s' not known" %
1920
                                 self.op.instance_name)
1921
    self.instance = instance
1922

    
1923
  def Exec(self, feedback_fn):
1924
    """Deactivate the disks
1925

1926
    """
1927
    instance = self.instance
1928
    ins_l = rpc.call_instance_list([instance.primary_node])
1929
    ins_l = ins_l[instance.primary_node]
1930
    if not type(ins_l) is list:
1931
      raise errors.OpExecError("Can't contact node '%s'" %
1932
                               instance.primary_node)
1933

    
1934
    if self.instance.name in ins_l:
1935
      raise errors.OpExecError("Instance is running, can't shutdown"
1936
                               " block devices.")
1937

    
1938
    _ShutdownInstanceDisks(instance, self.cfg)
1939

    
1940

    
1941
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1942
  """Shutdown block devices of an instance.
1943

1944
  This does the shutdown on all nodes of the instance.
1945

1946
  If the ignore_primary is false, errors on the primary node are
1947
  ignored.
1948

1949
  """
1950
  result = True
1951
  for disk in instance.disks:
1952
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1953
      cfg.SetDiskID(top_disk, node)
1954
      if not rpc.call_blockdev_shutdown(node, top_disk):
1955
        logger.Error("could not shutdown block device %s on node %s" %
1956
                     (disk.iv_name, node))
1957
        if not ignore_primary or node != instance.primary_node:
1958
          result = False
1959
  return result
1960

    
1961

    
1962
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1963
  """Checks if a node has enough free memory.
1964

1965
  This function check if a given node has the needed amount of free
1966
  memory. In case the node has less memory or we cannot get the
1967
  information from the node, this function raise an OpPrereqError
1968
  exception.
1969

1970
  Args:
1971
    - cfg: a ConfigWriter instance
1972
    - node: the node name
1973
    - reason: string to use in the error message
1974
    - requested: the amount of memory in MiB
1975

1976
  """
1977
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1978
  if not nodeinfo or not isinstance(nodeinfo, dict):
1979
    raise errors.OpPrereqError("Could not contact node %s for resource"
1980
                             " information" % (node,))
1981

    
1982
  free_mem = nodeinfo[node].get('memory_free')
1983
  if not isinstance(free_mem, int):
1984
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1985
                             " was '%s'" % (node, free_mem))
1986
  if requested > free_mem:
1987
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1988
                             " needed %s MiB, available %s MiB" %
1989
                             (node, reason, requested, free_mem))
1990

    
1991

    
1992
class LUStartupInstance(LogicalUnit):
1993
  """Starts an instance.
1994

1995
  """
1996
  HPATH = "instance-start"
1997
  HTYPE = constants.HTYPE_INSTANCE
1998
  _OP_REQP = ["instance_name", "force"]
1999

    
2000
  def BuildHooksEnv(self):
2001
    """Build hooks env.
2002

2003
    This runs on master, primary and secondary nodes of the instance.
2004

2005
    """
2006
    env = {
2007
      "FORCE": self.op.force,
2008
      }
2009
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2010
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2011
          list(self.instance.secondary_nodes))
2012
    return env, nl, nl
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    This checks that the instance is in the cluster.
2018

2019
    """
2020
    instance = self.cfg.GetInstanceInfo(
2021
      self.cfg.ExpandInstanceName(self.op.instance_name))
2022
    if instance is None:
2023
      raise errors.OpPrereqError("Instance '%s' not known" %
2024
                                 self.op.instance_name)
2025

    
2026
    # check bridges existance
2027
    _CheckInstanceBridgesExist(instance)
2028

    
2029
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2030
                         "starting instance %s" % instance.name,
2031
                         instance.memory)
2032

    
2033
    self.instance = instance
2034
    self.op.instance_name = instance.name
2035

    
2036
  def Exec(self, feedback_fn):
2037
    """Start the instance.
2038

2039
    """
2040
    instance = self.instance
2041
    force = self.op.force
2042
    extra_args = getattr(self.op, "extra_args", "")
2043

    
2044
    node_current = instance.primary_node
2045

    
2046
    _StartInstanceDisks(self.cfg, instance, force)
2047

    
2048
    if not rpc.call_instance_start(node_current, instance, extra_args):
2049
      _ShutdownInstanceDisks(instance, self.cfg)
2050
      raise errors.OpExecError("Could not start instance")
2051

    
2052
    self.cfg.MarkInstanceUp(instance.name)
2053

    
2054

    
2055
class LURebootInstance(LogicalUnit):
2056
  """Reboot an instance.
2057

2058
  """
2059
  HPATH = "instance-reboot"
2060
  HTYPE = constants.HTYPE_INSTANCE
2061
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2062

    
2063
  def BuildHooksEnv(self):
2064
    """Build hooks env.
2065

2066
    This runs on master, primary and secondary nodes of the instance.
2067

2068
    """
2069
    env = {
2070
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2071
      }
2072
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2073
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2074
          list(self.instance.secondary_nodes))
2075
    return env, nl, nl
2076

    
2077
  def CheckPrereq(self):
2078
    """Check prerequisites.
2079

2080
    This checks that the instance is in the cluster.
2081

2082
    """
2083
    instance = self.cfg.GetInstanceInfo(
2084
      self.cfg.ExpandInstanceName(self.op.instance_name))
2085
    if instance is None:
2086
      raise errors.OpPrereqError("Instance '%s' not known" %
2087
                                 self.op.instance_name)
2088

    
2089
    # check bridges existance
2090
    _CheckInstanceBridgesExist(instance)
2091

    
2092
    self.instance = instance
2093
    self.op.instance_name = instance.name
2094

    
2095
  def Exec(self, feedback_fn):
2096
    """Reboot the instance.
2097

2098
    """
2099
    instance = self.instance
2100
    ignore_secondaries = self.op.ignore_secondaries
2101
    reboot_type = self.op.reboot_type
2102
    extra_args = getattr(self.op, "extra_args", "")
2103

    
2104
    node_current = instance.primary_node
2105

    
2106
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2107
                           constants.INSTANCE_REBOOT_HARD,
2108
                           constants.INSTANCE_REBOOT_FULL]:
2109
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2110
                                  (constants.INSTANCE_REBOOT_SOFT,
2111
                                   constants.INSTANCE_REBOOT_HARD,
2112
                                   constants.INSTANCE_REBOOT_FULL))
2113

    
2114
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2115
                       constants.INSTANCE_REBOOT_HARD]:
2116
      if not rpc.call_instance_reboot(node_current, instance,
2117
                                      reboot_type, extra_args):
2118
        raise errors.OpExecError("Could not reboot instance")
2119
    else:
2120
      if not rpc.call_instance_shutdown(node_current, instance):
2121
        raise errors.OpExecError("could not shutdown instance for full reboot")
2122
      _ShutdownInstanceDisks(instance, self.cfg)
2123
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2124
      if not rpc.call_instance_start(node_current, instance, extra_args):
2125
        _ShutdownInstanceDisks(instance, self.cfg)
2126
        raise errors.OpExecError("Could not start instance for full reboot")
2127

    
2128
    self.cfg.MarkInstanceUp(instance.name)
2129

    
2130

    
2131
class LUShutdownInstance(LogicalUnit):
2132
  """Shutdown an instance.
2133

2134
  """
2135
  HPATH = "instance-stop"
2136
  HTYPE = constants.HTYPE_INSTANCE
2137
  _OP_REQP = ["instance_name"]
2138

    
2139
  def BuildHooksEnv(self):
2140
    """Build hooks env.
2141

2142
    This runs on master, primary and secondary nodes of the instance.
2143

2144
    """
2145
    env = _BuildInstanceHookEnvByObject(self.instance)
2146
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147
          list(self.instance.secondary_nodes))
2148
    return env, nl, nl
2149

    
2150
  def CheckPrereq(self):
2151
    """Check prerequisites.
2152

2153
    This checks that the instance is in the cluster.
2154

2155
    """
2156
    instance = self.cfg.GetInstanceInfo(
2157
      self.cfg.ExpandInstanceName(self.op.instance_name))
2158
    if instance is None:
2159
      raise errors.OpPrereqError("Instance '%s' not known" %
2160
                                 self.op.instance_name)
2161
    self.instance = instance
2162

    
2163
  def Exec(self, feedback_fn):
2164
    """Shutdown the instance.
2165

2166
    """
2167
    instance = self.instance
2168
    node_current = instance.primary_node
2169
    if not rpc.call_instance_shutdown(node_current, instance):
2170
      logger.Error("could not shutdown instance")
2171

    
2172
    self.cfg.MarkInstanceDown(instance.name)
2173
    _ShutdownInstanceDisks(instance, self.cfg)
2174

    
2175

    
2176
class LUReinstallInstance(LogicalUnit):
2177
  """Reinstall an instance.
2178

2179
  """
2180
  HPATH = "instance-reinstall"
2181
  HTYPE = constants.HTYPE_INSTANCE
2182
  _OP_REQP = ["instance_name"]
2183

    
2184
  def BuildHooksEnv(self):
2185
    """Build hooks env.
2186

2187
    This runs on master, primary and secondary nodes of the instance.
2188

2189
    """
2190
    env = _BuildInstanceHookEnvByObject(self.instance)
2191
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2192
          list(self.instance.secondary_nodes))
2193
    return env, nl, nl
2194

    
2195
  def CheckPrereq(self):
2196
    """Check prerequisites.
2197

2198
    This checks that the instance is in the cluster and is not running.
2199

2200
    """
2201
    instance = self.cfg.GetInstanceInfo(
2202
      self.cfg.ExpandInstanceName(self.op.instance_name))
2203
    if instance is None:
2204
      raise errors.OpPrereqError("Instance '%s' not known" %
2205
                                 self.op.instance_name)
2206
    if instance.disk_template == constants.DT_DISKLESS:
2207
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2208
                                 self.op.instance_name)
2209
    if instance.status != "down":
2210
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2211
                                 self.op.instance_name)
2212
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2213
    if remote_info:
2214
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2215
                                 (self.op.instance_name,
2216
                                  instance.primary_node))
2217

    
2218
    self.op.os_type = getattr(self.op, "os_type", None)
2219
    if self.op.os_type is not None:
2220
      # OS verification
2221
      pnode = self.cfg.GetNodeInfo(
2222
        self.cfg.ExpandNodeName(instance.primary_node))
2223
      if pnode is None:
2224
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2225
                                   self.op.pnode)
2226
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2227
      if not os_obj:
2228
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2229
                                   " primary node"  % self.op.os_type)
2230

    
2231
    self.instance = instance
2232

    
2233
  def Exec(self, feedback_fn):
2234
    """Reinstall the instance.
2235

2236
    """
2237
    inst = self.instance
2238

    
2239
    if self.op.os_type is not None:
2240
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2241
      inst.os = self.op.os_type
2242
      self.cfg.AddInstance(inst)
2243

    
2244
    _StartInstanceDisks(self.cfg, inst, None)
2245
    try:
2246
      feedback_fn("Running the instance OS create scripts...")
2247
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2248
        raise errors.OpExecError("Could not install OS for instance %s"
2249
                                 " on node %s" %
2250
                                 (inst.name, inst.primary_node))
2251
    finally:
2252
      _ShutdownInstanceDisks(inst, self.cfg)
2253

    
2254

    
2255
class LURenameInstance(LogicalUnit):
2256
  """Rename an instance.
2257

2258
  """
2259
  HPATH = "instance-rename"
2260
  HTYPE = constants.HTYPE_INSTANCE
2261
  _OP_REQP = ["instance_name", "new_name"]
2262

    
2263
  def BuildHooksEnv(self):
2264
    """Build hooks env.
2265

2266
    This runs on master, primary and secondary nodes of the instance.
2267

2268
    """
2269
    env = _BuildInstanceHookEnvByObject(self.instance)
2270
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2271
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272
          list(self.instance.secondary_nodes))
2273
    return env, nl, nl
2274

    
2275
  def CheckPrereq(self):
2276
    """Check prerequisites.
2277

2278
    This checks that the instance is in the cluster and is not running.
2279

2280
    """
2281
    instance = self.cfg.GetInstanceInfo(
2282
      self.cfg.ExpandInstanceName(self.op.instance_name))
2283
    if instance is None:
2284
      raise errors.OpPrereqError("Instance '%s' not known" %
2285
                                 self.op.instance_name)
2286
    if instance.status != "down":
2287
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2288
                                 self.op.instance_name)
2289
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2290
    if remote_info:
2291
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2292
                                 (self.op.instance_name,
2293
                                  instance.primary_node))
2294
    self.instance = instance
2295

    
2296
    # new name verification
2297
    name_info = utils.HostInfo(self.op.new_name)
2298

    
2299
    self.op.new_name = new_name = name_info.name
2300
    if not getattr(self.op, "ignore_ip", False):
2301
      command = ["fping", "-q", name_info.ip]
2302
      result = utils.RunCmd(command)
2303
      if not result.failed:
2304
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2305
                                   (name_info.ip, new_name))
2306

    
2307

    
2308
  def Exec(self, feedback_fn):
2309
    """Reinstall the instance.
2310

2311
    """
2312
    inst = self.instance
2313
    old_name = inst.name
2314

    
2315
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2316

    
2317
    # re-read the instance from the configuration after rename
2318
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2319

    
2320
    _StartInstanceDisks(self.cfg, inst, None)
2321
    try:
2322
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2323
                                          "sda", "sdb"):
2324
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2325
               " instance has been renamed in Ganeti)" %
2326
               (inst.name, inst.primary_node))
2327
        logger.Error(msg)
2328
    finally:
2329
      _ShutdownInstanceDisks(inst, self.cfg)
2330

    
2331

    
2332
class LURemoveInstance(LogicalUnit):
2333
  """Remove an instance.
2334

2335
  """
2336
  HPATH = "instance-remove"
2337
  HTYPE = constants.HTYPE_INSTANCE
2338
  _OP_REQP = ["instance_name"]
2339

    
2340
  def BuildHooksEnv(self):
2341
    """Build hooks env.
2342

2343
    This runs on master, primary and secondary nodes of the instance.
2344

2345
    """
2346
    env = _BuildInstanceHookEnvByObject(self.instance)
2347
    nl = [self.sstore.GetMasterNode()]
2348
    return env, nl, nl
2349

    
2350
  def CheckPrereq(self):
2351
    """Check prerequisites.
2352

2353
    This checks that the instance is in the cluster.
2354

2355
    """
2356
    instance = self.cfg.GetInstanceInfo(
2357
      self.cfg.ExpandInstanceName(self.op.instance_name))
2358
    if instance is None:
2359
      raise errors.OpPrereqError("Instance '%s' not known" %
2360
                                 self.op.instance_name)
2361
    self.instance = instance
2362

    
2363
  def Exec(self, feedback_fn):
2364
    """Remove the instance.
2365

2366
    """
2367
    instance = self.instance
2368
    logger.Info("shutting down instance %s on node %s" %
2369
                (instance.name, instance.primary_node))
2370

    
2371
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2372
      if self.op.ignore_failures:
2373
        feedback_fn("Warning: can't shutdown instance")
2374
      else:
2375
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2376
                                 (instance.name, instance.primary_node))
2377

    
2378
    logger.Info("removing block devices for instance %s" % instance.name)
2379

    
2380
    if not _RemoveDisks(instance, self.cfg):
2381
      if self.op.ignore_failures:
2382
        feedback_fn("Warning: can't remove instance's disks")
2383
      else:
2384
        raise errors.OpExecError("Can't remove instance's disks")
2385

    
2386
    logger.Info("removing instance %s out of cluster config" % instance.name)
2387

    
2388
    self.cfg.RemoveInstance(instance.name)
2389

    
2390

    
2391
class LUQueryInstances(NoHooksLU):
2392
  """Logical unit for querying instances.
2393

2394
  """
2395
  _OP_REQP = ["output_fields", "names"]
2396

    
2397
  def CheckPrereq(self):
2398
    """Check prerequisites.
2399

2400
    This checks that the fields required are valid output fields.
2401

2402
    """
2403
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2404
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2405
                               "admin_state", "admin_ram",
2406
                               "disk_template", "ip", "mac", "bridge",
2407
                               "sda_size", "sdb_size", "vcpus"],
2408
                       dynamic=self.dynamic_fields,
2409
                       selected=self.op.output_fields)
2410

    
2411
    self.wanted = _GetWantedInstances(self, self.op.names)
2412

    
2413
  def Exec(self, feedback_fn):
2414
    """Computes the list of nodes and their attributes.
2415

2416
    """
2417
    instance_names = self.wanted
2418
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2419
                     in instance_names]
2420

    
2421
    # begin data gathering
2422

    
2423
    nodes = frozenset([inst.primary_node for inst in instance_list])
2424

    
2425
    bad_nodes = []
2426
    if self.dynamic_fields.intersection(self.op.output_fields):
2427
      live_data = {}
2428
      node_data = rpc.call_all_instances_info(nodes)
2429
      for name in nodes:
2430
        result = node_data[name]
2431
        if result:
2432
          live_data.update(result)
2433
        elif result == False:
2434
          bad_nodes.append(name)
2435
        # else no instance is alive
2436
    else:
2437
      live_data = dict([(name, {}) for name in instance_names])
2438

    
2439
    # end data gathering
2440

    
2441
    output = []
2442
    for instance in instance_list:
2443
      iout = []
2444
      for field in self.op.output_fields:
2445
        if field == "name":
2446
          val = instance.name
2447
        elif field == "os":
2448
          val = instance.os
2449
        elif field == "pnode":
2450
          val = instance.primary_node
2451
        elif field == "snodes":
2452
          val = list(instance.secondary_nodes)
2453
        elif field == "admin_state":
2454
          val = (instance.status != "down")
2455
        elif field == "oper_state":
2456
          if instance.primary_node in bad_nodes:
2457
            val = None
2458
          else:
2459
            val = bool(live_data.get(instance.name))
2460
        elif field == "admin_ram":
2461
          val = instance.memory
2462
        elif field == "oper_ram":
2463
          if instance.primary_node in bad_nodes:
2464
            val = None
2465
          elif instance.name in live_data:
2466
            val = live_data[instance.name].get("memory", "?")
2467
          else:
2468
            val = "-"
2469
        elif field == "disk_template":
2470
          val = instance.disk_template
2471
        elif field == "ip":
2472
          val = instance.nics[0].ip
2473
        elif field == "bridge":
2474
          val = instance.nics[0].bridge
2475
        elif field == "mac":
2476
          val = instance.nics[0].mac
2477
        elif field == "sda_size" or field == "sdb_size":
2478
          disk = instance.FindDisk(field[:3])
2479
          if disk is None:
2480
            val = None
2481
          else:
2482
            val = disk.size
2483
        elif field == "vcpus":
2484
          val = instance.vcpus
2485
        else:
2486
          raise errors.ParameterError(field)
2487
        iout.append(val)
2488
      output.append(iout)
2489

    
2490
    return output
2491

    
2492

    
2493
class LUFailoverInstance(LogicalUnit):
2494
  """Failover an instance.
2495

2496
  """
2497
  HPATH = "instance-failover"
2498
  HTYPE = constants.HTYPE_INSTANCE
2499
  _OP_REQP = ["instance_name", "ignore_consistency"]
2500

    
2501
  def BuildHooksEnv(self):
2502
    """Build hooks env.
2503

2504
    This runs on master, primary and secondary nodes of the instance.
2505

2506
    """
2507
    env = {
2508
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2509
      }
2510
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2511
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2512
    return env, nl, nl
2513

    
2514
  def CheckPrereq(self):
2515
    """Check prerequisites.
2516

2517
    This checks that the instance is in the cluster.
2518

2519
    """
2520
    instance = self.cfg.GetInstanceInfo(
2521
      self.cfg.ExpandInstanceName(self.op.instance_name))
2522
    if instance is None:
2523
      raise errors.OpPrereqError("Instance '%s' not known" %
2524
                                 self.op.instance_name)
2525

    
2526
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2527
      raise errors.OpPrereqError("Instance's disk layout is not"
2528
                                 " network mirrored, cannot failover.")
2529

    
2530
    secondary_nodes = instance.secondary_nodes
2531
    if not secondary_nodes:
2532
      raise errors.ProgrammerError("no secondary node but using "
2533
                                   "DT_REMOTE_RAID1 template")
2534

    
2535
    target_node = secondary_nodes[0]
2536
    # check memory requirements on the secondary node
2537
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2538
                         instance.name, instance.memory)
2539

    
2540
    # check bridge existance
2541
    brlist = [nic.bridge for nic in instance.nics]
2542
    if not rpc.call_bridges_exist(target_node, brlist):
2543
      raise errors.OpPrereqError("One or more target bridges %s does not"
2544
                                 " exist on destination node '%s'" %
2545
                                 (brlist, target_node))
2546

    
2547
    self.instance = instance
2548

    
2549
  def Exec(self, feedback_fn):
2550
    """Failover an instance.
2551

2552
    The failover is done by shutting it down on its present node and
2553
    starting it on the secondary.
2554

2555
    """
2556
    instance = self.instance
2557

    
2558
    source_node = instance.primary_node
2559
    target_node = instance.secondary_nodes[0]
2560

    
2561
    feedback_fn("* checking disk consistency between source and target")
2562
    for dev in instance.disks:
2563
      # for remote_raid1, these are md over drbd
2564
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2565
        if not self.op.ignore_consistency:
2566
          raise errors.OpExecError("Disk %s is degraded on target node,"
2567
                                   " aborting failover." % dev.iv_name)
2568

    
2569
    feedback_fn("* shutting down instance on source node")
2570
    logger.Info("Shutting down instance %s on node %s" %
2571
                (instance.name, source_node))
2572

    
2573
    if not rpc.call_instance_shutdown(source_node, instance):
2574
      if self.op.ignore_consistency:
2575
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576
                     " anyway. Please make sure node %s is down"  %
2577
                     (instance.name, source_node, source_node))
2578
      else:
2579
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580
                                 (instance.name, source_node))
2581

    
2582
    feedback_fn("* deactivating the instance's disks on source node")
2583
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584
      raise errors.OpExecError("Can't shut down the instance's disks.")
2585

    
2586
    instance.primary_node = target_node
2587
    # distribute new instance config to the other nodes
2588
    self.cfg.AddInstance(instance)
2589

    
2590
    feedback_fn("* activating the instance's disks on target node")
2591
    logger.Info("Starting instance %s on node %s" %
2592
                (instance.name, target_node))
2593

    
2594
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595
                                             ignore_secondaries=True)
2596
    if not disks_ok:
2597
      _ShutdownInstanceDisks(instance, self.cfg)
2598
      raise errors.OpExecError("Can't activate the instance's disks")
2599

    
2600
    feedback_fn("* starting the instance on the target node")
2601
    if not rpc.call_instance_start(target_node, instance, None):
2602
      _ShutdownInstanceDisks(instance, self.cfg)
2603
      raise errors.OpExecError("Could not start instance %s on node %s." %
2604
                               (instance.name, target_node))
2605

    
2606

    
2607
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608
  """Create a tree of block devices on the primary node.
2609

2610
  This always creates all devices.
2611

2612
  """
2613
  if device.children:
2614
    for child in device.children:
2615
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2616
        return False
2617

    
2618
  cfg.SetDiskID(device, node)
2619
  new_id = rpc.call_blockdev_create(node, device, device.size,
2620
                                    instance.name, True, info)
2621
  if not new_id:
2622
    return False
2623
  if device.physical_id is None:
2624
    device.physical_id = new_id
2625
  return True
2626

    
2627

    
2628
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629
  """Create a tree of block devices on a secondary node.
2630

2631
  If this device type has to be created on secondaries, create it and
2632
  all its children.
2633

2634
  If not, just recurse to children keeping the same 'force' value.
2635

2636
  """
2637
  if device.CreateOnSecondary():
2638
    force = True
2639
  if device.children:
2640
    for child in device.children:
2641
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642
                                        child, force, info):
2643
        return False
2644

    
2645
  if not force:
2646
    return True
2647
  cfg.SetDiskID(device, node)
2648
  new_id = rpc.call_blockdev_create(node, device, device.size,
2649
                                    instance.name, False, info)
2650
  if not new_id:
2651
    return False
2652
  if device.physical_id is None:
2653
    device.physical_id = new_id
2654
  return True
2655

    
2656

    
2657
def _GenerateUniqueNames(cfg, exts):
2658
  """Generate a suitable LV name.
2659

2660
  This will generate a logical volume name for the given instance.
2661

2662
  """
2663
  results = []
2664
  for val in exts:
2665
    new_id = cfg.GenerateUniqueID()
2666
    results.append("%s%s" % (new_id, val))
2667
  return results
2668

    
2669

    
2670
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671
  """Generate a drbd device complete with its children.
2672

2673
  """
2674
  port = cfg.AllocatePort()
2675
  vgname = cfg.GetVGName()
2676
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677
                          logical_id=(vgname, names[0]))
2678
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679
                          logical_id=(vgname, names[1]))
2680
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681
                          logical_id = (primary, secondary, port),
2682
                          children = [dev_data, dev_meta])
2683
  return drbd_dev
2684

    
2685

    
2686
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687
  """Generate a drbd8 device complete with its children.
2688

2689
  """
2690
  port = cfg.AllocatePort()
2691
  vgname = cfg.GetVGName()
2692
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693
                          logical_id=(vgname, names[0]))
2694
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695
                          logical_id=(vgname, names[1]))
2696
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697
                          logical_id = (primary, secondary, port),
2698
                          children = [dev_data, dev_meta],
2699
                          iv_name=iv_name)
2700
  return drbd_dev
2701

    
2702
def _GenerateDiskTemplate(cfg, template_name,
2703
                          instance_name, primary_node,
2704
                          secondary_nodes, disk_sz, swap_sz):
2705
  """Generate the entire disk layout for a given template type.
2706

2707
  """
2708
  #TODO: compute space requirements
2709

    
2710
  vgname = cfg.GetVGName()
2711
  if template_name == "diskless":
2712
    disks = []
2713
  elif template_name == "plain":
2714
    if len(secondary_nodes) != 0:
2715
      raise errors.ProgrammerError("Wrong template configuration")
2716

    
2717
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719
                           logical_id=(vgname, names[0]),
2720
                           iv_name = "sda")
2721
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722
                           logical_id=(vgname, names[1]),
2723
                           iv_name = "sdb")
2724
    disks = [sda_dev, sdb_dev]
2725
  elif template_name == "local_raid1":
2726
    if len(secondary_nodes) != 0:
2727
      raise errors.ProgrammerError("Wrong template configuration")
2728

    
2729

    
2730
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731
                                       ".sdb_m1", ".sdb_m2"])
2732
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733
                              logical_id=(vgname, names[0]))
2734
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735
                              logical_id=(vgname, names[1]))
2736
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2737
                              size=disk_sz,
2738
                              children = [sda_dev_m1, sda_dev_m2])
2739
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740
                              logical_id=(vgname, names[2]))
2741
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742
                              logical_id=(vgname, names[3]))
2743
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2744
                              size=swap_sz,
2745
                              children = [sdb_dev_m1, sdb_dev_m2])
2746
    disks = [md_sda_dev, md_sdb_dev]
2747
  elif template_name == constants.DT_REMOTE_RAID1:
2748
    if len(secondary_nodes) != 1:
2749
      raise errors.ProgrammerError("Wrong template configuration")
2750
    remote_node = secondary_nodes[0]
2751
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752
                                       ".sdb_data", ".sdb_meta"])
2753
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754
                                         disk_sz, names[0:2])
2755
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756
                              children = [drbd_sda_dev], size=disk_sz)
2757
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758
                                         swap_sz, names[2:4])
2759
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760
                              children = [drbd_sdb_dev], size=swap_sz)
2761
    disks = [md_sda_dev, md_sdb_dev]
2762
  elif template_name == constants.DT_DRBD8:
2763
    if len(secondary_nodes) != 1:
2764
      raise errors.ProgrammerError("Wrong template configuration")
2765
    remote_node = secondary_nodes[0]
2766
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767
                                       ".sdb_data", ".sdb_meta"])
2768
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769
                                         disk_sz, names[0:2], "sda")
2770
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771
                                         swap_sz, names[2:4], "sdb")
2772
    disks = [drbd_sda_dev, drbd_sdb_dev]
2773
  else:
2774
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2775
  return disks
2776

    
2777

    
2778
def _GetInstanceInfoText(instance):
2779
  """Compute that text that should be added to the disk's metadata.
2780

2781
  """
2782
  return "originstname+%s" % instance.name
2783

    
2784

    
2785
def _CreateDisks(cfg, instance):
2786
  """Create all disks for an instance.
2787

2788
  This abstracts away some work from AddInstance.
2789

2790
  Args:
2791
    instance: the instance object
2792

2793
  Returns:
2794
    True or False showing the success of the creation process
2795

2796
  """
2797
  info = _GetInstanceInfoText(instance)
2798

    
2799
  for device in instance.disks:
2800
    logger.Info("creating volume %s for instance %s" %
2801
              (device.iv_name, instance.name))
2802
    #HARDCODE
2803
    for secondary_node in instance.secondary_nodes:
2804
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805
                                        device, False, info):
2806
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807
                     (device.iv_name, device, secondary_node))
2808
        return False
2809
    #HARDCODE
2810
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811
                                    instance, device, info):
2812
      logger.Error("failed to create volume %s on primary!" %
2813
                   device.iv_name)
2814
      return False
2815
  return True
2816

    
2817

    
2818
def _RemoveDisks(instance, cfg):
2819
  """Remove all disks for an instance.
2820

2821
  This abstracts away some work from `AddInstance()` and
2822
  `RemoveInstance()`. Note that in case some of the devices couldn't
2823
  be removed, the removal will continue with the other ones (compare
2824
  with `_CreateDisks()`).
2825

2826
  Args:
2827
    instance: the instance object
2828

2829
  Returns:
2830
    True or False showing the success of the removal proces
2831

2832
  """
2833
  logger.Info("removing block devices for instance %s" % instance.name)
2834

    
2835
  result = True
2836
  for device in instance.disks:
2837
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2838
      cfg.SetDiskID(disk, node)
2839
      if not rpc.call_blockdev_remove(node, disk):
2840
        logger.Error("could not remove block device %s on node %s,"
2841
                     " continuing anyway" %
2842
                     (device.iv_name, node))
2843
        result = False
2844
  return result
2845

    
2846

    
2847
class LUCreateInstance(LogicalUnit):
2848
  """Create an instance.
2849

2850
  """
2851
  HPATH = "instance-add"
2852
  HTYPE = constants.HTYPE_INSTANCE
2853
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854
              "disk_template", "swap_size", "mode", "start", "vcpus",
2855
              "wait_for_sync", "ip_check", "mac"]
2856

    
2857
  def BuildHooksEnv(self):
2858
    """Build hooks env.
2859

2860
    This runs on master, primary and secondary nodes of the instance.
2861

2862
    """
2863
    env = {
2864
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2866
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867
      "INSTANCE_ADD_MODE": self.op.mode,
2868
      }
2869
    if self.op.mode == constants.INSTANCE_IMPORT:
2870
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2871
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2872
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2873

    
2874
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875
      primary_node=self.op.pnode,
2876
      secondary_nodes=self.secondaries,
2877
      status=self.instance_status,
2878
      os_type=self.op.os_type,
2879
      memory=self.op.mem_size,
2880
      vcpus=self.op.vcpus,
2881
      nics=[(self.inst_ip, self.op.bridge)],
2882
    ))
2883

    
2884
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2885
          self.secondaries)
2886
    return env, nl, nl
2887

    
2888

    
2889
  def CheckPrereq(self):
2890
    """Check prerequisites.
2891

2892
    """
2893
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2894
      if not hasattr(self.op, attr):
2895
        setattr(self.op, attr, None)
2896

    
2897
    if self.op.mode not in (constants.INSTANCE_CREATE,
2898
                            constants.INSTANCE_IMPORT):
2899
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2900
                                 self.op.mode)
2901

    
2902
    if self.op.mode == constants.INSTANCE_IMPORT:
2903
      src_node = getattr(self.op, "src_node", None)
2904
      src_path = getattr(self.op, "src_path", None)
2905
      if src_node is None or src_path is None:
2906
        raise errors.OpPrereqError("Importing an instance requires source"
2907
                                   " node and path options")
2908
      src_node_full = self.cfg.ExpandNodeName(src_node)
2909
      if src_node_full is None:
2910
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2911
      self.op.src_node = src_node = src_node_full
2912

    
2913
      if not os.path.isabs(src_path):
2914
        raise errors.OpPrereqError("The source path must be absolute")
2915

    
2916
      export_info = rpc.call_export_info(src_node, src_path)
2917

    
2918
      if not export_info:
2919
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2920

    
2921
      if not export_info.has_section(constants.INISECT_EXP):
2922
        raise errors.ProgrammerError("Corrupted export config")
2923

    
2924
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2925
      if (int(ei_version) != constants.EXPORT_VERSION):
2926
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2927
                                   (ei_version, constants.EXPORT_VERSION))
2928

    
2929
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2930
        raise errors.OpPrereqError("Can't import instance with more than"
2931
                                   " one data disk")
2932

    
2933
      # FIXME: are the old os-es, disk sizes, etc. useful?
2934
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2935
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2936
                                                         'disk0_dump'))
2937
      self.src_image = diskimage
2938
    else: # INSTANCE_CREATE
2939
      if getattr(self.op, "os_type", None) is None:
2940
        raise errors.OpPrereqError("No guest OS specified")
2941

    
2942
    # check primary node
2943
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2944
    if pnode is None:
2945
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2946
                                 self.op.pnode)
2947
    self.op.pnode = pnode.name
2948
    self.pnode = pnode
2949
    self.secondaries = []
2950
    # disk template and mirror node verification
2951
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2952
      raise errors.OpPrereqError("Invalid disk template name")
2953

    
2954
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2955
      if getattr(self.op, "snode", None) is None:
2956
        raise errors.OpPrereqError("The networked disk templates need"
2957
                                   " a mirror node")
2958

    
2959
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2960
      if snode_name is None:
2961
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2962
                                   self.op.snode)
2963
      elif snode_name == pnode.name:
2964
        raise errors.OpPrereqError("The secondary node cannot be"
2965
                                   " the primary node.")
2966
      self.secondaries.append(snode_name)
2967

    
2968
    # Required free disk space as a function of disk and swap space
2969
    req_size_dict = {
2970
      constants.DT_DISKLESS: None,
2971
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2974
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2976
    }
2977

    
2978
    if self.op.disk_template not in req_size_dict:
2979
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2980
                                   " is unknown" %  self.op.disk_template)
2981

    
2982
    req_size = req_size_dict[self.op.disk_template]
2983

    
2984
    # Check lv size requirements
2985
    if req_size is not None:
2986
      nodenames = [pnode.name] + self.secondaries
2987
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2988
      for node in nodenames:
2989
        info = nodeinfo.get(node, None)
2990
        if not info:
2991
          raise errors.OpPrereqError("Cannot get current information"
2992
                                     " from node '%s'" % nodeinfo)
2993
        vg_free = info.get('vg_free', None)
2994
        if not isinstance(vg_free, int):
2995
          raise errors.OpPrereqError("Can't compute free disk space on"
2996
                                     " node %s" % node)
2997
        if req_size > info['vg_free']:
2998
          raise errors.OpPrereqError("Not enough disk space on target node %s."
2999
                                     " %d MB available, %d MB required" %
3000
                                     (node, info['vg_free'], req_size))
3001

    
3002
    # os verification
3003
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3004
    if not os_obj:
3005
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3006
                                 " primary node"  % self.op.os_type)
3007

    
3008
    if self.op.kernel_path == constants.VALUE_NONE:
3009
      raise errors.OpPrereqError("Can't set instance kernel to none")
3010

    
3011
    # instance verification
3012
    hostname1 = utils.HostInfo(self.op.instance_name)
3013

    
3014
    self.op.instance_name = instance_name = hostname1.name
3015
    instance_list = self.cfg.GetInstanceList()
3016
    if instance_name in instance_list:
3017
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3018
                                 instance_name)
3019

    
3020
    ip = getattr(self.op, "ip", None)
3021
    if ip is None or ip.lower() == "none":
3022
      inst_ip = None
3023
    elif ip.lower() == "auto":
3024
      inst_ip = hostname1.ip
3025
    else:
3026
      if not utils.IsValidIP(ip):
3027
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3028
                                   " like a valid IP" % ip)
3029
      inst_ip = ip
3030
    self.inst_ip = inst_ip
3031

    
3032
    if self.op.start and not self.op.ip_check:
3033
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3034
                                 " adding an instance in start mode")
3035

    
3036
    if self.op.ip_check:
3037
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3038
                       constants.DEFAULT_NODED_PORT):
3039
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3040
                                   (hostname1.ip, instance_name))
3041

    
3042
    # MAC address verification
3043
    if self.op.mac != "auto":
3044
      if not utils.IsValidMac(self.op.mac.lower()):
3045
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3046
                                   self.op.mac)
3047

    
3048
    # bridge verification
3049
    bridge = getattr(self.op, "bridge", None)
3050
    if bridge is None:
3051
      self.op.bridge = self.cfg.GetDefBridge()
3052
    else:
3053
      self.op.bridge = bridge
3054

    
3055
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3056
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3057
                                 " destination node '%s'" %
3058
                                 (self.op.bridge, pnode.name))
3059

    
3060
    # boot order verification
3061
    if self.op.hvm_boot_order is not None:
3062
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3063
             raise errors.OpPrereqError("invalid boot order specified,"
3064
                                        " must be one or more of [acdn]")
3065

    
3066
    if self.op.start:
3067
      self.instance_status = 'up'
3068
    else:
3069
      self.instance_status = 'down'
3070

    
3071
  def Exec(self, feedback_fn):
3072
    """Create and add the instance to the cluster.
3073

3074
    """
3075
    instance = self.op.instance_name
3076
    pnode_name = self.pnode.name
3077

    
3078
    if self.op.mac == "auto":
3079
      mac_address = self.cfg.GenerateMAC()
3080
    else:
3081
      mac_address = self.op.mac
3082

    
3083
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3084
    if self.inst_ip is not None:
3085
      nic.ip = self.inst_ip
3086

    
3087
    ht_kind = self.sstore.GetHypervisorType()
3088
    if ht_kind in constants.HTS_REQ_PORT:
3089
      network_port = self.cfg.AllocatePort()
3090
    else:
3091
      network_port = None
3092

    
3093
    disks = _GenerateDiskTemplate(self.cfg,
3094
                                  self.op.disk_template,
3095
                                  instance, pnode_name,
3096
                                  self.secondaries, self.op.disk_size,
3097
                                  self.op.swap_size)
3098

    
3099
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3100
                            primary_node=pnode_name,
3101
                            memory=self.op.mem_size,
3102
                            vcpus=self.op.vcpus,
3103
                            nics=[nic], disks=disks,
3104
                            disk_template=self.op.disk_template,
3105
                            status=self.instance_status,
3106
                            network_port=network_port,
3107
                            kernel_path=self.op.kernel_path,
3108
                            initrd_path=self.op.initrd_path,
3109
                            hvm_boot_order=self.op.hvm_boot_order,
3110
                            )
3111

    
3112
    feedback_fn("* creating instance disks...")
3113
    if not _CreateDisks(self.cfg, iobj):
3114
      _RemoveDisks(iobj, self.cfg)
3115
      raise errors.OpExecError("Device creation failed, reverting...")
3116

    
3117
    feedback_fn("adding instance %s to cluster config" % instance)
3118

    
3119
    self.cfg.AddInstance(iobj)
3120

    
3121
    if self.op.wait_for_sync:
3122
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3123
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3124
      # make sure the disks are not degraded (still sync-ing is ok)
3125
      time.sleep(15)
3126
      feedback_fn("* checking mirrors status")
3127
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3128
    else:
3129
      disk_abort = False
3130

    
3131
    if disk_abort:
3132
      _RemoveDisks(iobj, self.cfg)
3133
      self.cfg.RemoveInstance(iobj.name)
3134
      raise errors.OpExecError("There are some degraded disks for"
3135
                               " this instance")
3136

    
3137
    feedback_fn("creating os for instance %s on node %s" %
3138
                (instance, pnode_name))
3139

    
3140
    if iobj.disk_template != constants.DT_DISKLESS:
3141
      if self.op.mode == constants.INSTANCE_CREATE:
3142
        feedback_fn("* running the instance OS create scripts...")
3143
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3144
          raise errors.OpExecError("could not add os for instance %s"
3145
                                   " on node %s" %
3146
                                   (instance, pnode_name))
3147

    
3148
      elif self.op.mode == constants.INSTANCE_IMPORT:
3149
        feedback_fn("* running the instance OS import scripts...")
3150
        src_node = self.op.src_node
3151
        src_image = self.src_image
3152
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3153
                                                src_node, src_image):
3154
          raise errors.OpExecError("Could not import os for instance"
3155
                                   " %s on node %s" %
3156
                                   (instance, pnode_name))
3157
      else:
3158
        # also checked in the prereq part
3159
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3160
                                     % self.op.mode)
3161

    
3162
    if self.op.start:
3163
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3164
      feedback_fn("* starting instance...")
3165
      if not rpc.call_instance_start(pnode_name, iobj, None):
3166
        raise errors.OpExecError("Could not start instance")
3167

    
3168

    
3169
class LUConnectConsole(NoHooksLU):
3170
  """Connect to an instance's console.
3171

3172
  This is somewhat special in that it returns the command line that
3173
  you need to run on the master node in order to connect to the
3174
  console.
3175

3176
  """
3177
  _OP_REQP = ["instance_name"]
3178

    
3179
  def CheckPrereq(self):
3180
    """Check prerequisites.
3181

3182
    This checks that the instance is in the cluster.
3183

3184
    """
3185
    instance = self.cfg.GetInstanceInfo(
3186
      self.cfg.ExpandInstanceName(self.op.instance_name))
3187
    if instance is None:
3188
      raise errors.OpPrereqError("Instance '%s' not known" %
3189
                                 self.op.instance_name)
3190
    self.instance = instance
3191

    
3192
  def Exec(self, feedback_fn):
3193
    """Connect to the console of an instance
3194

3195
    """
3196
    instance = self.instance
3197
    node = instance.primary_node
3198

    
3199
    node_insts = rpc.call_instance_list([node])[node]
3200
    if node_insts is False:
3201
      raise errors.OpExecError("Can't connect to node %s." % node)
3202

    
3203
    if instance.name not in node_insts:
3204
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3205

    
3206
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3207

    
3208
    hyper = hypervisor.GetHypervisor()
3209
    console_cmd = hyper.GetShellCommandForConsole(instance)
3210
    # build ssh cmdline
3211
    argv = ["ssh", "-q", "-t"]
3212
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3213
    argv.extend(ssh.BATCH_MODE_OPTS)
3214
    argv.append(node)
3215
    argv.append(console_cmd)
3216
    return "ssh", argv
3217

    
3218

    
3219
class LUAddMDDRBDComponent(LogicalUnit):
3220
  """Adda new mirror member to an instance's disk.
3221

3222
  """
3223
  HPATH = "mirror-add"
3224
  HTYPE = constants.HTYPE_INSTANCE
3225
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3226

    
3227
  def BuildHooksEnv(self):
3228
    """Build hooks env.
3229

3230
    This runs on the master, the primary and all the secondaries.
3231

3232
    """
3233
    env = {
3234
      "NEW_SECONDARY": self.op.remote_node,
3235
      "DISK_NAME": self.op.disk_name,
3236
      }
3237
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3238
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3239
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3240
    return env, nl, nl
3241

    
3242
  def CheckPrereq(self):
3243
    """Check prerequisites.
3244

3245
    This checks that the instance is in the cluster.
3246

3247
    """
3248
    instance = self.cfg.GetInstanceInfo(
3249
      self.cfg.ExpandInstanceName(self.op.instance_name))
3250
    if instance is None:
3251
      raise errors.OpPrereqError("Instance '%s' not known" %
3252
                                 self.op.instance_name)
3253
    self.instance = instance
3254

    
3255
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3256
    if remote_node is None:
3257
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3258
    self.remote_node = remote_node
3259

    
3260
    if remote_node == instance.primary_node:
3261
      raise errors.OpPrereqError("The specified node is the primary node of"
3262
                                 " the instance.")
3263

    
3264
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3265
      raise errors.OpPrereqError("Instance's disk layout is not"
3266
                                 " remote_raid1.")
3267
    for disk in instance.disks:
3268
      if disk.iv_name == self.op.disk_name:
3269
        break
3270
    else:
3271
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3272
                                 " instance." % self.op.disk_name)
3273
    if len(disk.children) > 1:
3274
      raise errors.OpPrereqError("The device already has two slave devices."
3275
                                 " This would create a 3-disk raid1 which we"
3276
                                 " don't allow.")
3277
    self.disk = disk
3278

    
3279
  def Exec(self, feedback_fn):
3280
    """Add the mirror component
3281

3282
    """
3283
    disk = self.disk
3284
    instance = self.instance
3285

    
3286
    remote_node = self.remote_node
3287
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3288
    names = _GenerateUniqueNames(self.cfg, lv_names)
3289
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3290
                                     remote_node, disk.size, names)
3291

    
3292
    logger.Info("adding new mirror component on secondary")
3293
    #HARDCODE
3294
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3295
                                      new_drbd, False,
3296
                                      _GetInstanceInfoText(instance)):
3297
      raise errors.OpExecError("Failed to create new component on secondary"
3298
                               " node %s" % remote_node)
3299

    
3300
    logger.Info("adding new mirror component on primary")
3301
    #HARDCODE
3302
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3303
                                    instance, new_drbd,
3304
                                    _GetInstanceInfoText(instance)):
3305
      # remove secondary dev
3306
      self.cfg.SetDiskID(new_drbd, remote_node)
3307
      rpc.call_blockdev_remove(remote_node, new_drbd)
3308
      raise errors.OpExecError("Failed to create volume on primary")
3309

    
3310
    # the device exists now
3311
    # call the primary node to add the mirror to md
3312
    logger.Info("adding new mirror component to md")
3313
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3314
                                         disk, [new_drbd]):
3315
      logger.Error("Can't add mirror compoment to md!")
3316
      self.cfg.SetDiskID(new_drbd, remote_node)
3317
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3318
        logger.Error("Can't rollback on secondary")
3319
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3320
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3321
        logger.Error("Can't rollback on primary")
3322
      raise errors.OpExecError("Can't add mirror component to md array")
3323

    
3324
    disk.children.append(new_drbd)
3325

    
3326
    self.cfg.AddInstance(instance)
3327

    
3328
    _WaitForSync(self.cfg, instance, self.proc)
3329

    
3330
    return 0
3331

    
3332

    
3333
class LURemoveMDDRBDComponent(LogicalUnit):
3334
  """Remove a component from a remote_raid1 disk.
3335

3336
  """
3337
  HPATH = "mirror-remove"
3338
  HTYPE = constants.HTYPE_INSTANCE
3339
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3340

    
3341
  def BuildHooksEnv(self):
3342
    """Build hooks env.
3343

3344
    This runs on the master, the primary and all the secondaries.
3345

3346
    """
3347
    env = {
3348
      "DISK_NAME": self.op.disk_name,
3349
      "DISK_ID": self.op.disk_id,
3350
      "OLD_SECONDARY": self.old_secondary,
3351
      }
3352
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3353
    nl = [self.sstore.GetMasterNode(),
3354
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3355
    return env, nl, nl
3356

    
3357
  def CheckPrereq(self):
3358
    """Check prerequisites.
3359

3360
    This checks that the instance is in the cluster.
3361

3362
    """
3363
    instance = self.cfg.GetInstanceInfo(
3364
      self.cfg.ExpandInstanceName(self.op.instance_name))
3365
    if instance is None:
3366
      raise errors.OpPrereqError("Instance '%s' not known" %
3367
                                 self.op.instance_name)
3368
    self.instance = instance
3369

    
3370
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3371
      raise errors.OpPrereqError("Instance's disk layout is not"
3372
                                 " remote_raid1.")
3373
    for disk in instance.disks:
3374
      if disk.iv_name == self.op.disk_name:
3375
        break
3376
    else:
3377
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3378
                                 " instance." % self.op.disk_name)
3379
    for child in disk.children:
3380
      if (child.dev_type == constants.LD_DRBD7 and
3381
          child.logical_id[2] == self.op.disk_id):
3382
        break
3383
    else:
3384
      raise errors.OpPrereqError("Can't find the device with this port.")
3385

    
3386
    if len(disk.children) < 2:
3387
      raise errors.OpPrereqError("Cannot remove the last component from"
3388
                                 " a mirror.")
3389
    self.disk = disk
3390
    self.child = child
3391
    if self.child.logical_id[0] == instance.primary_node:
3392
      oid = 1
3393
    else:
3394
      oid = 0
3395
    self.old_secondary = self.child.logical_id[oid]
3396

    
3397
  def Exec(self, feedback_fn):
3398
    """Remove the mirror component
3399

3400
    """
3401
    instance = self.instance
3402
    disk = self.disk
3403
    child = self.child
3404
    logger.Info("remove mirror component")
3405
    self.cfg.SetDiskID(disk, instance.primary_node)
3406
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3407
                                            disk, [child]):
3408
      raise errors.OpExecError("Can't remove child from mirror.")
3409

    
3410
    for node in child.logical_id[:2]:
3411
      self.cfg.SetDiskID(child, node)
3412
      if not rpc.call_blockdev_remove(node, child):
3413
        logger.Error("Warning: failed to remove device from node %s,"
3414
                     " continuing operation." % node)
3415

    
3416
    disk.children.remove(child)
3417
    self.cfg.AddInstance(instance)
3418

    
3419

    
3420
class LUReplaceDisks(LogicalUnit):
3421
  """Replace the disks of an instance.
3422

3423
  """
3424
  HPATH = "mirrors-replace"
3425
  HTYPE = constants.HTYPE_INSTANCE
3426
  _OP_REQP = ["instance_name", "mode", "disks"]
3427

    
3428
  def BuildHooksEnv(self):
3429
    """Build hooks env.
3430

3431
    This runs on the master, the primary and all the secondaries.
3432

3433
    """
3434
    env = {
3435
      "MODE": self.op.mode,
3436
      "NEW_SECONDARY": self.op.remote_node,
3437
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3438
      }
3439
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3440
    nl = [
3441
      self.sstore.GetMasterNode(),
3442
      self.instance.primary_node,
3443
      ]
3444
    if self.op.remote_node is not None:
3445
      nl.append(self.op.remote_node)
3446
    return env, nl, nl
3447

    
3448
  def CheckPrereq(self):
3449
    """Check prerequisites.
3450

3451
    This checks that the instance is in the cluster.
3452

3453
    """
3454
    instance = self.cfg.GetInstanceInfo(
3455
      self.cfg.ExpandInstanceName(self.op.instance_name))
3456
    if instance is None:
3457
      raise errors.OpPrereqError("Instance '%s' not known" %
3458
                                 self.op.instance_name)
3459
    self.instance = instance
3460
    self.op.instance_name = instance.name
3461

    
3462
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3463
      raise errors.OpPrereqError("Instance's disk layout is not"
3464
                                 " network mirrored.")
3465

    
3466
    if len(instance.secondary_nodes) != 1:
3467
      raise errors.OpPrereqError("The instance has a strange layout,"
3468
                                 " expected one secondary but found %d" %
3469
                                 len(instance.secondary_nodes))
3470

    
3471
    self.sec_node = instance.secondary_nodes[0]
3472

    
3473
    remote_node = getattr(self.op, "remote_node", None)
3474
    if remote_node is not None:
3475
      remote_node = self.cfg.ExpandNodeName(remote_node)
3476
      if remote_node is None:
3477
        raise errors.OpPrereqError("Node '%s' not known" %
3478
                                   self.op.remote_node)
3479
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3480
    else:
3481
      self.remote_node_info = None
3482
    if remote_node == instance.primary_node:
3483
      raise errors.OpPrereqError("The specified node is the primary node of"
3484
                                 " the instance.")
3485
    elif remote_node == self.sec_node:
3486
      if self.op.mode == constants.REPLACE_DISK_SEC:
3487
        # this is for DRBD8, where we can't execute the same mode of
3488
        # replacement as for drbd7 (no different port allocated)
3489
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3490
                                   " replacement")
3491
      # the user gave the current secondary, switch to
3492
      # 'no-replace-secondary' mode for drbd7
3493
      remote_node = None
3494
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3495
        self.op.mode != constants.REPLACE_DISK_ALL):
3496
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3497
                                 " disks replacement, not individual ones")
3498
    if instance.disk_template == constants.DT_DRBD8:
3499
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3500
          remote_node is not None):
3501
        # switch to replace secondary mode
3502
        self.op.mode = constants.REPLACE_DISK_SEC
3503

    
3504
      if self.op.mode == constants.REPLACE_DISK_ALL:
3505
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3506
                                   " secondary disk replacement, not"
3507
                                   " both at once")
3508
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3509
        if remote_node is not None:
3510
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3511
                                     " the secondary while doing a primary"
3512
                                     " node disk replacement")
3513
        self.tgt_node = instance.primary_node
3514
        self.oth_node = instance.secondary_nodes[0]
3515
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3516
        self.new_node = remote_node # this can be None, in which case
3517
                                    # we don't change the secondary
3518
        self.tgt_node = instance.secondary_nodes[0]
3519
        self.oth_node = instance.primary_node
3520
      else:
3521
        raise errors.ProgrammerError("Unhandled disk replace mode")
3522

    
3523
    for name in self.op.disks:
3524
      if instance.FindDisk(name) is None:
3525
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3526
                                   (name, instance.name))
3527
    self.op.remote_node = remote_node
3528

    
3529
  def _ExecRR1(self, feedback_fn):
3530
    """Replace the disks of an instance.
3531

3532
    """
3533
    instance = self.instance
3534
    iv_names = {}
3535
    # start of work
3536
    if self.op.remote_node is None:
3537
      remote_node = self.sec_node
3538
    else:
3539
      remote_node = self.op.remote_node
3540
    cfg = self.cfg
3541
    for dev in instance.disks:
3542
      size = dev.size
3543
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3544
      names = _GenerateUniqueNames(cfg, lv_names)
3545
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3546
                                       remote_node, size, names)
3547
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3548
      logger.Info("adding new mirror component on secondary for %s" %
3549
                  dev.iv_name)
3550
      #HARDCODE
3551
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3552
                                        new_drbd, False,
3553
                                        _GetInstanceInfoText(instance)):
3554
        raise errors.OpExecError("Failed to create new component on secondary"
3555
                                 " node %s. Full abort, cleanup manually!" %
3556
                                 remote_node)
3557

    
3558
      logger.Info("adding new mirror component on primary")
3559
      #HARDCODE
3560
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3561
                                      instance, new_drbd,
3562
                                      _GetInstanceInfoText(instance)):
3563
        # remove secondary dev
3564
        cfg.SetDiskID(new_drbd, remote_node)
3565
        rpc.call_blockdev_remove(remote_node, new_drbd)
3566
        raise errors.OpExecError("Failed to create volume on primary!"
3567
                                 " Full abort, cleanup manually!!")
3568

    
3569
      # the device exists now
3570
      # call the primary node to add the mirror to md
3571
      logger.Info("adding new mirror component to md")
3572
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3573
                                           [new_drbd]):
3574
        logger.Error("Can't add mirror compoment to md!")
3575
        cfg.SetDiskID(new_drbd, remote_node)
3576
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3577
          logger.Error("Can't rollback on secondary")
3578
        cfg.SetDiskID(new_drbd, instance.primary_node)
3579
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3580
          logger.Error("Can't rollback on primary")
3581
        raise errors.OpExecError("Full abort, cleanup manually!!")
3582

    
3583
      dev.children.append(new_drbd)
3584
      cfg.AddInstance(instance)
3585

    
3586
    # this can fail as the old devices are degraded and _WaitForSync
3587
    # does a combined result over all disks, so we don't check its
3588
    # return value
3589
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3590

    
3591
    # so check manually all the devices
3592
    for name in iv_names:
3593
      dev, child, new_drbd = iv_names[name]
3594
      cfg.SetDiskID(dev, instance.primary_node)
3595
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3596
      if is_degr:
3597
        raise errors.OpExecError("MD device %s is degraded!" % name)
3598
      cfg.SetDiskID(new_drbd, instance.primary_node)
3599
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3600
      if is_degr:
3601
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3602

    
3603
    for name in iv_names:
3604
      dev, child, new_drbd = iv_names[name]
3605
      logger.Info("remove mirror %s component" % name)
3606
      cfg.SetDiskID(dev, instance.primary_node)
3607
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3608
                                              dev, [child]):
3609
        logger.Error("Can't remove child from mirror, aborting"
3610
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3611
        continue
3612

    
3613
      for node in child.logical_id[:2]:
3614
        logger.Info("remove child device on %s" % node)
3615
        cfg.SetDiskID(child, node)
3616
        if not rpc.call_blockdev_remove(node, child):
3617
          logger.Error("Warning: failed to remove device from node %s,"
3618
                       " continuing operation." % node)
3619

    
3620
      dev.children.remove(child)
3621

    
3622
      cfg.AddInstance(instance)
3623

    
3624
  def _ExecD8DiskOnly(self, feedback_fn):
3625
    """Replace a disk on the primary or secondary for dbrd8.
3626

3627
    The algorithm for replace is quite complicated:
3628
      - for each disk to be replaced:
3629
        - create new LVs on the target node with unique names
3630
        - detach old LVs from the drbd device
3631
        - rename old LVs to name_replaced.<time_t>
3632
        - rename new LVs to old LVs
3633
        - attach the new LVs (with the old names now) to the drbd device
3634
      - wait for sync across all devices
3635
      - for each modified disk:
3636
        - remove old LVs (which have the name name_replaces.<time_t>)
3637

3638
    Failures are not very well handled.
3639

3640
    """
3641
    steps_total = 6
3642
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3643
    instance = self.instance
3644
    iv_names = {}
3645
    vgname = self.cfg.GetVGName()
3646
    # start of work
3647
    cfg = self.cfg
3648
    tgt_node = self.tgt_node
3649
    oth_node = self.oth_node
3650

    
3651
    # Step: check device activation
3652
    self.proc.LogStep(1, steps_total, "check device existence")
3653
    info("checking volume groups")
3654
    my_vg = cfg.GetVGName()
3655
    results = rpc.call_vg_list([oth_node, tgt_node])
3656
    if not results:
3657
      raise errors.OpExecError("Can't list volume groups on the nodes")
3658
    for node in oth_node, tgt_node:
3659
      res = results.get(node, False)
3660
      if not res or my_vg not in res:
3661
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3662
                                 (my_vg, node))
3663
    for dev in instance.disks:
3664
      if not dev.iv_name in self.op.disks:
3665
        continue
3666
      for node in tgt_node, oth_node:
3667
        info("checking %s on %s" % (dev.iv_name, node))
3668
        cfg.SetDiskID(dev, node)
3669
        if not rpc.call_blockdev_find(node, dev):
3670
          raise errors.OpExecError("Can't find device %s on node %s" %
3671
                                   (dev.iv_name, node))
3672

    
3673
    # Step: check other node consistency
3674
    self.proc.LogStep(2, steps_total, "check peer consistency")
3675
    for dev in instance.disks:
3676
      if not dev.iv_name in self.op.disks:
3677
        continue
3678
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3679
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3680
                                   oth_node==instance.primary_node):
3681
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3682
                                 " to replace disks on this node (%s)" %
3683
                                 (oth_node, tgt_node))
3684

    
3685
    # Step: create new storage
3686
    self.proc.LogStep(3, steps_total, "allocate new storage")
3687
    for dev in instance.disks:
3688
      if not dev.iv_name in self.op.disks:
3689
        continue
3690
      size = dev.size
3691
      cfg.SetDiskID(dev, tgt_node)
3692
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3693
      names = _GenerateUniqueNames(cfg, lv_names)
3694
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3695
                             logical_id=(vgname, names[0]))
3696
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3697
                             logical_id=(vgname, names[1]))
3698
      new_lvs = [lv_data, lv_meta]
3699
      old_lvs = dev.children
3700
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3701
      info("creating new local storage on %s for %s" %
3702
           (tgt_node, dev.iv_name))
3703
      # since we *always* want to create this LV, we use the
3704
      # _Create...OnPrimary (which forces the creation), even if we
3705
      # are talking about the secondary node
3706
      for new_lv in new_lvs:
3707
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3708
                                        _GetInstanceInfoText(instance)):
3709
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3710
                                   " node '%s'" %
3711
                                   (new_lv.logical_id[1], tgt_node))
3712

    
3713
    # Step: for each lv, detach+rename*2+attach
3714
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3715
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3716
      info("detaching %s drbd from local storage" % dev.iv_name)
3717
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3718
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3719
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3720
      #dev.children = []
3721
      #cfg.Update(instance)
3722

    
3723
      # ok, we created the new LVs, so now we know we have the needed
3724
      # storage; as such, we proceed on the target node to rename
3725
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3726
      # using the assumption than logical_id == physical_id (which in
3727
      # turn is the unique_id on that node)
3728

    
3729
      # FIXME(iustin): use a better name for the replaced LVs
3730
      temp_suffix = int(time.time())
3731
      ren_fn = lambda d, suff: (d.physical_id[0],
3732
                                d.physical_id[1] + "_replaced-%s" % suff)
3733
      # build the rename list based on what LVs exist on the node
3734
      rlist = []
3735
      for to_ren in old_lvs:
3736
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3737
        if find_res is not None: # device exists
3738
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3739

    
3740
      info("renaming the old LVs on the target node")
3741
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3742
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3743
      # now we rename the new LVs to the old LVs
3744
      info("renaming the new LVs on the target node")
3745
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3746
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3747
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3748

    
3749
      for old, new in zip(old_lvs, new_lvs):
3750
        new.logical_id = old.logical_id
3751
        cfg.SetDiskID(new, tgt_node)
3752

    
3753
      for disk in old_lvs:
3754
        disk.logical_id = ren_fn(disk, temp_suffix)
3755
        cfg.SetDiskID(disk, tgt_node)
3756

    
3757
      # now that the new lvs have the old name, we can add them to the device
3758
      info("adding new mirror component on %s" % tgt_node)
3759
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3760
        for new_lv in new_lvs:
3761
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3762
            warning("Can't rollback device %s", hint="manually cleanup unused"
3763
                    " logical volumes")
3764
        raise errors.OpExecError("Can't add local storage to drbd")
3765

    
3766
      dev.children = new_lvs
3767
      cfg.Update(instance)
3768

    
3769
    # Step: wait for sync
3770

    
3771
    # this can fail as the old devices are degraded and _WaitForSync
3772
    # does a combined result over all disks, so we don't check its
3773
    # return value
3774
    self.proc.LogStep(5, steps_total, "sync devices")
3775
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3776

    
3777
    # so check manually all the devices
3778
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3779
      cfg.SetDiskID(dev, instance.primary_node)
3780
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3781
      if is_degr:
3782
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3783

    
3784
    # Step: remove old storage
3785
    self.proc.LogStep(6, steps_total, "removing old storage")
3786
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3787
      info("remove logical volumes for %s" % name)
3788
      for lv in old_lvs:
3789
        cfg.SetDiskID(lv, tgt_node)
3790
        if not rpc.call_blockdev_remove(tgt_node, lv):
3791
          warning("Can't remove old LV", hint="manually remove unused LVs")
3792
          continue
3793

    
3794
  def _ExecD8Secondary(self, feedback_fn):
3795
    """Replace the secondary node for drbd8.
3796

3797
    The algorithm for replace is quite complicated:
3798
      - for all disks of the instance:
3799
        - create new LVs on the new node with same names
3800
        - shutdown the drbd device on the old secondary
3801
        - disconnect the drbd network on the primary
3802
        - create the drbd device on the new secondary
3803
        - network attach the drbd on the primary, using an artifice:
3804
          the drbd code for Attach() will connect to the network if it
3805
          finds a device which is connected to the good local disks but
3806
          not network enabled
3807
      - wait for sync across all devices
3808
      - remove all disks from the old secondary
3809

3810
    Failures are not very well handled.
3811

3812
    """
3813
    steps_total = 6
3814
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3815
    instance = self.instance
3816
    iv_names = {}
3817
    vgname = self.cfg.GetVGName()
3818
    # start of work
3819
    cfg = self.cfg
3820
    old_node = self.tgt_node
3821
    new_node = self.new_node
3822
    pri_node = instance.primary_node
3823

    
3824
    # Step: check device activation
3825
    self.proc.LogStep(1, steps_total, "check device existence")
3826
    info("checking volume groups")
3827
    my_vg = cfg.GetVGName()
3828
    results = rpc.call_vg_list([pri_node, new_node])
3829
    if not results:
3830
      raise errors.OpExecError("Can't list volume groups on the nodes")
3831
    for node in pri_node, new_node:
3832
      res = results.get(node, False)
3833
      if not res or my_vg not in res:
3834
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3835
                                 (my_vg, node))
3836
    for dev in instance.disks:
3837
      if not dev.iv_name in self.op.disks:
3838
        continue
3839
      info("checking %s on %s" % (dev.iv_name, pri_node))
3840
      cfg.SetDiskID(dev, pri_node)
3841
      if not rpc.call_blockdev_find(pri_node, dev):
3842
        raise errors.OpExecError("Can't find device %s on node %s" %
3843
                                 (dev.iv_name, pri_node))
3844

    
3845
    # Step: check other node consistency
3846
    self.proc.LogStep(2, steps_total, "check peer consistency")
3847
    for dev in instance.disks:
3848
      if not dev.iv_name in self.op.disks:
3849
        continue
3850
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3851
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3852
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3853
                                 " unsafe to replace the secondary" %
3854
                                 pri_node)
3855

    
3856
    # Step: create new storage
3857
    self.proc.LogStep(3, steps_total, "allocate new storage")
3858
    for dev in instance.disks:
3859
      size = dev.size
3860
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3861
      # since we *always* want to create this LV, we use the
3862
      # _Create...OnPrimary (which forces the creation), even if we
3863
      # are talking about the secondary node
3864
      for new_lv in dev.children:
3865
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3866
                                        _GetInstanceInfoText(instance)):
3867
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3868
                                   " node '%s'" %
3869
                                   (new_lv.logical_id[1], new_node))
3870

    
3871
      iv_names[dev.iv_name] = (dev, dev.children)
3872

    
3873
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3874
    for dev in instance.disks:
3875
      size = dev.size
3876
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3877
      # create new devices on new_node
3878
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3879
                              logical_id=(pri_node, new_node,
3880
                                          dev.logical_id[2]),
3881
                              children=dev.children)
3882
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3883
                                        new_drbd, False,
3884
                                      _GetInstanceInfoText(instance)):
3885
        raise errors.OpExecError("Failed to create new DRBD on"
3886
                                 " node '%s'" % new_node)
3887

    
3888
    for dev in instance.disks:
3889
      # we have new devices, shutdown the drbd on the old secondary
3890
      info("shutting down drbd for %s on old node" % dev.iv_name)
3891
      cfg.SetDiskID(dev, old_node)
3892
      if not rpc.call_blockdev_shutdown(old_node, dev):
3893
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3894
                hint="Please cleanup this device manually as soon as possible")
3895

    
3896
    info("detaching primary drbds from the network (=> standalone)")
3897
    done = 0
3898
    for dev in instance.disks:
3899
      cfg.SetDiskID(dev, pri_node)
3900
      # set the physical (unique in bdev terms) id to None, meaning
3901
      # detach from network
3902
      dev.physical_id = (None,) * len(dev.physical_id)
3903
      # and 'find' the device, which will 'fix' it to match the
3904
      # standalone state
3905
      if rpc.call_blockdev_find(pri_node, dev):
3906
        done += 1
3907
      else:
3908
        warning("Failed to detach drbd %s from network, unusual case" %
3909
                dev.iv_name)
3910

    
3911
    if not done:
3912
      # no detaches succeeded (very unlikely)
3913
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3914

    
3915
    # if we managed to detach at least one, we update all the disks of
3916
    # the instance to point to the new secondary
3917
    info("updating instance configuration")
3918
    for dev in instance.disks:
3919
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3920
      cfg.SetDiskID(dev, pri_node)
3921
    cfg.Update(instance)
3922

    
3923
    # and now perform the drbd attach
3924
    info("attaching primary drbds to new secondary (standalone => connected)")
3925
    failures = []
3926
    for dev in instance.disks:
3927
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3928
      # since the attach is smart, it's enough to 'find' the device,
3929
      # it will automatically activate the network, if the physical_id
3930
      # is correct
3931
      cfg.SetDiskID(dev, pri_node)
3932
      if not rpc.call_blockdev_find(pri_node, dev):
3933
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3934
                "please do a gnt-instance info to see the status of disks")
3935

    
3936
    # this can fail as the old devices are degraded and _WaitForSync
3937
    # does a combined result over all disks, so we don't check its
3938
    # return value
3939
    self.proc.LogStep(5, steps_total, "sync devices")
3940
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3941

    
3942
    # so check manually all the devices
3943
    for name, (dev, old_lvs) in iv_names.iteritems():
3944
      cfg.SetDiskID(dev, pri_node)
3945
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3946
      if is_degr:
3947
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3948

    
3949
    self.proc.LogStep(6, steps_total, "removing old storage")
3950
    for name, (dev, old_lvs) in iv_names.iteritems():
3951
      info("remove logical volumes for %s" % name)
3952
      for lv in old_lvs:
3953
        cfg.SetDiskID(lv, old_node)
3954
        if not rpc.call_blockdev_remove(old_node, lv):
3955
          warning("Can't remove LV on old secondary",
3956
                  hint="Cleanup stale volumes by hand")
3957

    
3958
  def Exec(self, feedback_fn):
3959
    """Execute disk replacement.
3960

3961
    This dispatches the disk replacement to the appropriate handler.
3962

3963
    """
3964
    instance = self.instance
3965
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3966
      fn = self._ExecRR1
3967
    elif instance.disk_template == constants.DT_DRBD8:
3968
      if self.op.remote_node is None:
3969
        fn = self._ExecD8DiskOnly
3970
      else:
3971
        fn = self._ExecD8Secondary
3972
    else:
3973
      raise errors.ProgrammerError("Unhandled disk replacement case")
3974
    return fn(feedback_fn)
3975

    
3976

    
3977
class LUQueryInstanceData(NoHooksLU):
3978
  """Query runtime instance data.
3979

3980
  """
3981
  _OP_REQP = ["instances"]
3982

    
3983
  def CheckPrereq(self):
3984
    """Check prerequisites.
3985

3986
    This only checks the optional instance list against the existing names.
3987

3988
    """
3989
    if not isinstance(self.op.instances, list):
3990
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3991
    if self.op.instances:
3992
      self.wanted_instances = []
3993
      names = self.op.instances
3994
      for name in names:
3995
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3996
        if instance is None:
3997
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3998
      self.wanted_instances.append(instance)
3999
    else:
4000
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4001
                               in self.cfg.GetInstanceList()]
4002
    return
4003

    
4004

    
4005
  def _ComputeDiskStatus(self, instance, snode, dev):
4006
    """Compute block device status.
4007

4008
    """
4009
    self.cfg.SetDiskID(dev, instance.primary_node)
4010
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4011
    if dev.dev_type in constants.LDS_DRBD:
4012
      # we change the snode then (otherwise we use the one passed in)
4013
      if dev.logical_id[0] == instance.primary_node:
4014
        snode = dev.logical_id[1]
4015
      else:
4016
        snode = dev.logical_id[0]
4017

    
4018
    if snode:
4019
      self.cfg.SetDiskID(dev, snode)
4020
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4021
    else:
4022
      dev_sstatus = None
4023

    
4024
    if dev.children:
4025
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4026
                      for child in dev.children]
4027
    else:
4028
      dev_children = []
4029

    
4030
    data = {
4031
      "iv_name": dev.iv_name,
4032
      "dev_type": dev.dev_type,
4033
      "logical_id": dev.logical_id,
4034
      "physical_id": dev.physical_id,
4035
      "pstatus": dev_pstatus,
4036
      "sstatus": dev_sstatus,
4037
      "children": dev_children,
4038
      }
4039

    
4040
    return data
4041

    
4042
  def Exec(self, feedback_fn):
4043
    """Gather and return data"""
4044
    result = {}
4045
    for instance in self.wanted_instances:
4046
      remote_info = rpc.call_instance_info(instance.primary_node,
4047
                                                instance.name)
4048
      if remote_info and "state" in remote_info:
4049
        remote_state = "up"
4050
      else:
4051
        remote_state = "down"
4052
      if instance.status == "down":
4053
        config_state = "down"
4054
      else:
4055
        config_state = "up"
4056

    
4057
      disks = [self._ComputeDiskStatus(instance, None, device)
4058
               for device in instance.disks]
4059

    
4060
      idict = {
4061
        "name": instance.name,
4062
        "config_state": config_state,
4063
        "run_state": remote_state,
4064
        "pnode": instance.primary_node,
4065
        "snodes": instance.secondary_nodes,
4066
        "os": instance.os,
4067
        "memory": instance.memory,
4068
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4069
        "disks": disks,
4070
        "network_port": instance.network_port,
4071
        "vcpus": instance.vcpus,
4072
        "kernel_path": instance.kernel_path,
4073
        "initrd_path": instance.initrd_path,
4074
        "hvm_boot_order": instance.hvm_boot_order,
4075
        }
4076

    
4077
      result[instance.name] = idict
4078

    
4079
    return result
4080

    
4081

    
4082
class LUSetInstanceParms(LogicalUnit):
4083
  """Modifies an instances's parameters.
4084

4085
  """
4086
  HPATH = "instance-modify"
4087
  HTYPE = constants.HTYPE_INSTANCE
4088
  _OP_REQP = ["instance_name"]
4089

    
4090
  def BuildHooksEnv(self):
4091
    """Build hooks env.
4092

4093
    This runs on the master, primary and secondaries.
4094

4095
    """
4096
    args = dict()
4097
    if self.mem:
4098
      args['memory'] = self.mem
4099
    if self.vcpus:
4100
      args['vcpus'] = self.vcpus
4101
    if self.do_ip or self.do_bridge:
4102
      if self.do_ip:
4103
        ip = self.ip
4104
      else:
4105
        ip = self.instance.nics[0].ip
4106
      if self.bridge:
4107
        bridge = self.bridge
4108
      else:
4109
        bridge = self.instance.nics[0].bridge
4110
      args['nics'] = [(ip, bridge)]
4111
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4112
    nl = [self.sstore.GetMasterNode(),
4113
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4114
    return env, nl, nl
4115

    
4116
  def CheckPrereq(self):
4117
    """Check prerequisites.
4118

4119
    This only checks the instance list against the existing names.
4120

4121
    """
4122
    self.mem = getattr(self.op, "mem", None)
4123
    self.vcpus = getattr(self.op, "vcpus", None)
4124
    self.ip = getattr(self.op, "ip", None)
4125
    self.mac = getattr(self.op, "mac", None)
4126
    self.bridge = getattr(self.op, "bridge", None)
4127
    self.kernel_path = getattr(self.op, "kernel_path", None)
4128
    self.initrd_path = getattr(self.op, "initrd_path", None)
4129
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4130
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4131
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4132
    if all_parms.count(None) == len(all_parms):
4133
      raise errors.OpPrereqError("No changes submitted")
4134
    if self.mem is not None:
4135
      try:
4136
        self.mem = int(self.mem)
4137
      except ValueError, err:
4138
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4139
    if self.vcpus is not None:
4140
      try:
4141
        self.vcpus = int(self.vcpus)
4142
      except ValueError, err:
4143
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4144
    if self.ip is not None:
4145
      self.do_ip = True
4146
      if self.ip.lower() == "none":
4147
        self.ip = None
4148
      else:
4149
        if not utils.IsValidIP(self.ip):
4150
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4151
    else:
4152
      self.do_ip = False
4153
    self.do_bridge = (self.bridge is not None)
4154
    if self.mac is not None:
4155
      if self.cfg.IsMacInUse(self.mac):
4156
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4157
                                   self.mac)
4158
      if not utils.IsValidMac(self.mac):
4159
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4160

    
4161
    if self.kernel_path is not None:
4162
      self.do_kernel_path = True
4163
      if self.kernel_path == constants.VALUE_NONE:
4164
        raise errors.OpPrereqError("Can't set instance to no kernel")
4165

    
4166
      if self.kernel_path != constants.VALUE_DEFAULT:
4167
        if not os.path.isabs(self.kernel_path):
4168
          raise errors.OpPrereqError("The kernel path must be an absolute"
4169
                                    " filename")
4170
    else:
4171
      self.do_kernel_path = False
4172

    
4173
    if self.initrd_path is not None:
4174
      self.do_initrd_path = True
4175
      if self.initrd_path not in (constants.VALUE_NONE,
4176
                                  constants.VALUE_DEFAULT):
4177
        if not os.path.isabs(self.initrd_path):
4178
          raise errors.OpPrereqError("The initrd path must be an absolute"
4179
                                    " filename")
4180
    else:
4181
      self.do_initrd_path = False
4182

    
4183
    # boot order verification
4184
    if self.hvm_boot_order is not None:
4185
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4186
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4187
          raise errors.OpPrereqError("invalid boot order specified,"
4188
                                     " must be one or more of [acdn]"
4189
                                     " or 'default'")
4190

    
4191
    instance = self.cfg.GetInstanceInfo(
4192
      self.cfg.ExpandInstanceName(self.op.instance_name))
4193
    if instance is None:
4194
      raise errors.OpPrereqError("No such instance name '%s'" %
4195
                                 self.op.instance_name)
4196
    self.op.instance_name = instance.name
4197
    self.instance = instance
4198
    return
4199

    
4200
  def Exec(self, feedback_fn):
4201
    """Modifies an instance.
4202

4203
    All parameters take effect only at the next restart of the instance.
4204
    """
4205
    result = []
4206
    instance = self.instance
4207
    if self.mem:
4208
      instance.memory = self.mem
4209
      result.append(("mem", self.mem))
4210
    if self.vcpus:
4211
      instance.vcpus = self.vcpus
4212
      result.append(("vcpus",  self.vcpus))
4213
    if self.do_ip:
4214
      instance.nics[0].ip = self.ip
4215
      result.append(("ip", self.ip))
4216
    if self.bridge:
4217
      instance.nics[0].bridge = self.bridge
4218
      result.append(("bridge", self.bridge))
4219
    if self.mac:
4220
      instance.nics[0].mac = self.mac
4221
      result.append(("mac", self.mac))
4222
    if self.do_kernel_path:
4223
      instance.kernel_path = self.kernel_path
4224
      result.append(("kernel_path", self.kernel_path))
4225
    if self.do_initrd_path:
4226
      instance.initrd_path = self.initrd_path
4227
      result.append(("initrd_path", self.initrd_path))
4228
    if self.hvm_boot_order:
4229
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4230
        instance.hvm_boot_order = None
4231
      else:
4232
        instance.hvm_boot_order = self.hvm_boot_order
4233
      result.append(("hvm_boot_order", self.hvm_boot_order))
4234

    
4235
    self.cfg.AddInstance(instance)
4236

    
4237
    return result
4238

    
4239

    
4240
class LUQueryExports(NoHooksLU):
4241
  """Query the exports list
4242

4243
  """
4244
  _OP_REQP = []
4245

    
4246
  def CheckPrereq(self):
4247
    """Check that the nodelist contains only existing nodes.
4248

4249
    """
4250
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4251

    
4252
  def Exec(self, feedback_fn):
4253
    """Compute the list of all the exported system images.
4254

4255
    Returns:
4256
      a dictionary with the structure node->(export-list)
4257
      where export-list is a list of the instances exported on
4258
      that node.
4259

4260
    """
4261
    return rpc.call_export_list(self.nodes)
4262

    
4263

    
4264
class LUExportInstance(LogicalUnit):
4265
  """Export an instance to an image in the cluster.
4266

4267
  """
4268
  HPATH = "instance-export"
4269
  HTYPE = constants.HTYPE_INSTANCE
4270
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4271

    
4272
  def BuildHooksEnv(self):
4273
    """Build hooks env.
4274

4275
    This will run on the master, primary node and target node.
4276

4277
    """
4278
    env = {
4279
      "EXPORT_NODE": self.op.target_node,
4280
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4281
      }
4282
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4283
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4284
          self.op.target_node]
4285
    return env, nl, nl
4286

    
4287
  def CheckPrereq(self):
4288
    """Check prerequisites.
4289

4290
    This checks that the instance name is a valid one.
4291

4292
    """
4293
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4294
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4295
    if self.instance is None:
4296
      raise errors.OpPrereqError("Instance '%s' not found" %
4297
                                 self.op.instance_name)
4298

    
4299
    # node verification
4300
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4301
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4302

    
4303
    if self.dst_node is None:
4304
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4305
                                 self.op.target_node)
4306
    self.op.target_node = self.dst_node.name
4307

    
4308
  def Exec(self, feedback_fn):
4309
    """Export an instance to an image in the cluster.
4310

4311
    """
4312
    instance = self.instance
4313
    dst_node = self.dst_node
4314
    src_node = instance.primary_node
4315
    # shutdown the instance, unless requested not to do so
4316
    if self.op.shutdown:
4317
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4318
      self.proc.ChainOpCode(op)
4319

    
4320
    vgname = self.cfg.GetVGName()
4321

    
4322
    snap_disks = []
4323

    
4324
    try:
4325
      for disk in instance.disks:
4326
        if disk.iv_name == "sda":
4327
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4328
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4329

    
4330
          if not new_dev_name:
4331
            logger.Error("could not snapshot block device %s on node %s" %
4332
                         (disk.logical_id[1], src_node))
4333
          else:
4334
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4335
                                      logical_id=(vgname, new_dev_name),
4336
                                      physical_id=(vgname, new_dev_name),
4337
                                      iv_name=disk.iv_name)
4338
            snap_disks.append(new_dev)
4339

    
4340
    finally:
4341
      if self.op.shutdown:
4342
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4343
                                       force=False)
4344
        self.proc.ChainOpCode(op)
4345

    
4346
    # TODO: check for size
4347

    
4348
    for dev in snap_disks:
4349
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4350
                                           instance):
4351
        logger.Error("could not export block device %s from node"
4352
                     " %s to node %s" %
4353
                     (dev.logical_id[1], src_node, dst_node.name))
4354
      if not rpc.call_blockdev_remove(src_node, dev):
4355
        logger.Error("could not remove snapshot block device %s from"
4356
                     " node %s" % (dev.logical_id[1], src_node))
4357

    
4358
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359
      logger.Error("could not finalize export for instance %s on node %s" %
4360
                   (instance.name, dst_node.name))
4361

    
4362
    nodelist = self.cfg.GetNodeList()
4363
    nodelist.remove(dst_node.name)
4364

    
4365
    # on one-node clusters nodelist will be empty after the removal
4366
    # if we proceed the backup would be removed because OpQueryExports
4367
    # substitutes an empty list with the full cluster node list.
4368
    if nodelist:
4369
      op = opcodes.OpQueryExports(nodes=nodelist)
4370
      exportlist = self.proc.ChainOpCode(op)
4371
      for node in exportlist:
4372
        if instance.name in exportlist[node]:
4373
          if not rpc.call_export_remove(node, instance.name):
4374
            logger.Error("could not remove older export for instance %s"
4375
                         " on node %s" % (instance.name, node))
4376

    
4377

    
4378
class TagsLU(NoHooksLU):
4379
  """Generic tags LU.
4380

4381
  This is an abstract class which is the parent of all the other tags LUs.
4382

4383
  """
4384
  def CheckPrereq(self):
4385
    """Check prerequisites.
4386

4387
    """
4388
    if self.op.kind == constants.TAG_CLUSTER:
4389
      self.target = self.cfg.GetClusterInfo()
4390
    elif self.op.kind == constants.TAG_NODE:
4391
      name = self.cfg.ExpandNodeName(self.op.name)
4392
      if name is None:
4393
        raise errors.OpPrereqError("Invalid node name (%s)" %
4394
                                   (self.op.name,))
4395
      self.op.name = name
4396
      self.target = self.cfg.GetNodeInfo(name)
4397
    elif self.op.kind == constants.TAG_INSTANCE:
4398
      name = self.cfg.ExpandInstanceName(self.op.name)
4399
      if name is None:
4400
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4401
                                   (self.op.name,))
4402
      self.op.name = name
4403
      self.target = self.cfg.GetInstanceInfo(name)
4404
    else:
4405
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4406
                                 str(self.op.kind))
4407

    
4408

    
4409
class LUGetTags(TagsLU):
4410
  """Returns the tags of a given object.
4411

4412
  """
4413
  _OP_REQP = ["kind", "name"]
4414

    
4415
  def Exec(self, feedback_fn):
4416
    """Returns the tag list.
4417

4418
    """
4419
    return self.target.GetTags()
4420

    
4421

    
4422
class LUSearchTags(NoHooksLU):
4423
  """Searches the tags for a given pattern.
4424

4425
  """
4426
  _OP_REQP = ["pattern"]
4427

    
4428
  def CheckPrereq(self):
4429
    """Check prerequisites.
4430

4431
    This checks the pattern passed for validity by compiling it.
4432

4433
    """
4434
    try:
4435
      self.re = re.compile(self.op.pattern)
4436
    except re.error, err:
4437
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4438
                                 (self.op.pattern, err))
4439

    
4440
  def Exec(self, feedback_fn):
4441
    """Returns the tag list.
4442

4443
    """
4444
    cfg = self.cfg
4445
    tgts = [("/cluster", cfg.GetClusterInfo())]
4446
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4447
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4448
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4449
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4450
    results = []
4451
    for path, target in tgts:
4452
      for tag in target.GetTags():
4453
        if self.re.search(tag):
4454
          results.append((path, tag))
4455
    return results
4456

    
4457

    
4458
class LUAddTags(TagsLU):
4459
  """Sets a tag on a given object.
4460

4461
  """
4462
  _OP_REQP = ["kind", "name", "tags"]
4463

    
4464
  def CheckPrereq(self):
4465
    """Check prerequisites.
4466

4467
    This checks the type and length of the tag name and value.
4468

4469
    """
4470
    TagsLU.CheckPrereq(self)
4471
    for tag in self.op.tags:
4472
      objects.TaggableObject.ValidateTag(tag)
4473

    
4474
  def Exec(self, feedback_fn):
4475
    """Sets the tag.
4476

4477
    """
4478
    try:
4479
      for tag in self.op.tags:
4480
        self.target.AddTag(tag)
4481
    except errors.TagError, err:
4482
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4483
    try:
4484
      self.cfg.Update(self.target)
4485
    except errors.ConfigurationError:
4486
      raise errors.OpRetryError("There has been a modification to the"
4487
                                " config file and the operation has been"
4488
                                " aborted. Please retry.")
4489

    
4490

    
4491
class LUDelTags(TagsLU):
4492
  """Delete a list of tags from a given object.
4493

4494
  """
4495
  _OP_REQP = ["kind", "name", "tags"]
4496

    
4497
  def CheckPrereq(self):
4498
    """Check prerequisites.
4499

4500
    This checks that we have the given tag.
4501

4502
    """
4503
    TagsLU.CheckPrereq(self)
4504
    for tag in self.op.tags:
4505
      objects.TaggableObject.ValidateTag(tag)
4506
    del_tags = frozenset(self.op.tags)
4507
    cur_tags = self.target.GetTags()
4508
    if not del_tags <= cur_tags:
4509
      diff_tags = del_tags - cur_tags
4510
      diff_names = ["'%s'" % tag for tag in diff_tags]
4511
      diff_names.sort()
4512
      raise errors.OpPrereqError("Tag(s) %s not found" %
4513
                                 (",".join(diff_names)))
4514

    
4515
  def Exec(self, feedback_fn):
4516
    """Remove the tag from the object.
4517

4518
    """
4519
    for tag in self.op.tags:
4520
      self.target.RemoveTag(tag)
4521
    try:
4522
      self.cfg.Update(self.target)
4523
    except errors.ConfigurationError:
4524
      raise errors.OpRetryError("There has been a modification to the"
4525
                                " config file and the operation has been"
4526
                                " aborted. Please retry.")